Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19141

Flink Job Submitted on Yarn Does not Allocate Task Manager

Details

    Description

      1. Create a flink cluster on yarn

      2. Submit my job

      3. The job manager print logs with exception

      4. However, flink-1.10.1 works

       

      2020-09-07 03:14:46,951 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job comment_stat (31c1814bfb2332beb32eb1aea887ea99) under job master id a64310a58e5c2f684f15be79b4694156.
      2020-09-07 03:14:46,952 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
      2020-09-07 03:14:46,953 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job comment_stat (31c1814bfb2332beb32eb1aea887ea99) switched from state CREATED to RUNNING.
      2020-09-07 03:14:47,116 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) (880ec8a17712f8421f85c05173e2fe0c) switched from CREATED to SCHEDULED.
      2020-09-07 03:14:47,116 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (2/2) (7def5d2d92538e3885a4162e50b8fb91) switched from CREATED to SCHEDULED.
      2020-09-07 03:14:47,116 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(TumblingEventTimeWindows(300000), EventTimeTrigger, ScalaReduceFunction, PassThroughWindowFunction) -> Map (1/2) (9ae6819476dd931b244c9ff71b030e51) switched from CREATED to SCHEDULED.
      2020-09-07 03:14:47,116 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(TumblingEventTimeWindows(300000), EventTimeTrigger, ScalaReduceFunction, PassThroughWindowFunction) -> Map (2/2) (3dc9aaefcc6cb76e1c92fb12344f40b3) switched from CREATED to SCHEDULED.
      2020-09-07 03:14:47,116 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(TumblingEventTimeWindows(900000), EventTimeTrigger, ScalaReduceFunction, PassThroughWindowFunction) -> Map -> Sink: CommentStat-ESSink (1/2) (26a9d7e370d4bdcb700d75dc8c17e6bd) switched from CREATED to SCHEDULED.
      2020-09-07 03:14:47,116 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(TumblingEventTimeWindows(900000), EventTimeTrigger, ScalaReduceFunction, PassThroughWindowFunction) -> Map -> Sink: CommentStat-ESSink (2/2) (e49459cbfbcbde0790439e031325afda) switched from CREATED to SCHEDULED.
      2020-09-07 03:14:47,128 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{5506c59a4da17024607d0c47a5b2ca45}]2020-09-07 03:14:47,132 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{9a7d66f099a74f5d2e31107c4a95e782}]2020-09-07 03:14:47,135 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Connecting to ResourceManager akka.tcp://flink@n44-15.fn.ams.osa:44173/user/rpc/resourcemanager_0(bbd23781fedbd5f4f41f4524277c4d77)2020-09-07 03:14:47,139 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Resolved ResourceManager address, beginning registration2020-09-07 03:14:47,142 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - Starting ZooKeeperLeaderRetrievalService /leader/31c1814bfb2332beb32eb1aea887ea99/job_manager_lock.
      2020-09-07 03:14:47,142 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Registering job manager a64310a58e5c2f684f15be79b4694156@akka.tcp://flink@n44-15.fn.ams.osa:44173/user/rpc/jobmanager_2 for job 31c1814bfb2332beb32eb1aea887ea99.
      2020-09-07 03:14:47,146 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Registered job manager a64310a58e5c2f684f15be79b4694156@akka.tcp://flink@n44-15.fn.ams.osa:44173/user/rpc/jobmanager_2 for job 31c1814bfb2332beb32eb1aea887ea99.
      2020-09-07 03:14:47,148 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: bbd23781fedbd5f4f41f4524277c4d77.
      2020-09-07 03:14:47,148 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Requesting new slot [SlotRequestId{5506c59a4da17024607d0c47a5b2ca45}] and profile ResourceProfile{UNKNOWN} from resource manager.
      2020-09-07 03:14:47,149 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Request slot with profile ResourceProfile{UNKNOWN} for job 31c1814bfb2332beb32eb1aea887ea99 with allocation id 09293aa820cea4f58bdf40f5efbaeb6d.
      2020-09-07 03:14:47,149 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] - Requesting new slot [SlotRequestId{9a7d66f099a74f5d2e31107c4a95e782}] and profile ResourceProfile{UNKNOWN} from resource manager.
      2020-09-07 03:14:47,160 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=2.0, taskHeapSize=5.200gb (5583457416 bytes), taskOffHeapSize=0 bytes, networkMemSize=1024.000mb (1073741824 bytes), managedMemSize=4.300gb (4617089912 bytes)}. Number pending workers of this resource is 1.
      2020-09-07 03:14:47,161 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Request slot with profile ResourceProfile{UNKNOWN} for job 31c1814bfb2332beb32eb1aea887ea99 with allocation id df78cb5041da900696ab8f505a235c94.
      2020-09-07 03:14:49,246 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:14:52,286 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl        [] - Received new token for : n49-04.fn.ams.osa:454542020-09-07 03:14:52,289 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Received 1 containers.
      2020-09-07 03:14:52,292 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Received 1 containers with resource <memory:17920, vCores:2>, 0 pending container requests.
      2020-09-07 03:14:52,295 ERROR org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor         [] - Caught exception while executing runnable in main thread.java.lang.IllegalStateException: The RMClient's and YarnResourceManagers internal state about the number of pending container requests for resource <memory:17920, vCores:2> has diverged. Number client's pending container requests 1 != Number RM's pending container requests 0.
         at org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.yarn.YarnResourceManager.getPendingRequestsAndCheckConsistency(YarnResourceManager.java:517) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.yarn.YarnResourceManager.onContainersOfResourceAllocated(YarnResourceManager.java:427) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.1.jar:1.11.1]2020-09-07 03:14:59,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:15:09,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:15:19,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:15:29,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:15:39,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:15:49,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:15:59,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:16:09,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:16:19,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:16:29,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:16:39,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:16:49,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:16:59,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:17:09,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:17:19,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:17:29,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:17:39,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:17:49,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:17:59,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:18:09,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:18:19,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:18:29,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:18:39,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:18:49,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:18:59,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:19:09,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:19:19,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:19:29,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:19:39,245 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint triggering task Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) of job 31c1814bfb2332beb32eb1aea887ea99 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      2020-09-07 03:19:47,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Map -> Filter -> Timestamps/Watermarks (1/2) (880ec8a17712f8421f85c05173e2fe0c) switched from SCHEDULED to FAILED on not deployed.org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources.
         at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_77]
         at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_77]
         at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_77]
         at org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$21(FutureUtils.java:1120) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_77]
         at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.1.jar:1.11.1]
         at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.1.jar:1.11.1]Caused by: java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException
         at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) ~[?:1.8.0_77]
         at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_77]
         ... 25 more
      Caused by: java.util.concurrent.TimeoutException    ... 23 more
      

       

      Attachments

        Issue Links

          Activity

            ShawnHx Xiao Huang added a comment -

            Hi, yunhui.

            According to the log, it seems there's no enough slots to run this job.

            For streaming jobs, the default slot request timeout is 300s. If the scheduler cannot get the needed slot after 300s, it will throw this exception.

            You can try to decrease the memory of tm or add the number of slots per tm to increase the total number of slots in cluster.

            Hope this can solve your problem

            ShawnHx Xiao Huang added a comment - Hi, yunhui . According to the log, it seems there's no enough slots to run this job. For streaming jobs, the default slot request timeout is 300s. If the scheduler cannot get the needed slot after 300s, it will throw this exception. You can try to decrease the memory of tm or add the number of slots per tm to increase the total number of slots in cluster. Hope this can solve your problem
            xtsong Xintong Song added a comment -

            Hi yunhui,

            It is probably a known issue, FLINK-19151. Can you verify that FairScheduler or SLSFairScheduler is used by Yarn?

            Unfortunately, this issue is fixed for Flink 1.12.0 and 1.11.3, both unreleased at the moment.

            xtsong Xintong Song added a comment - Hi yunhui , It is probably a known issue, FLINK-19151 . Can you verify that FairScheduler or SLSFairScheduler is used by Yarn? Unfortunately, this issue is fixed for Flink 1.12.0 and 1.11.3, both unreleased at the moment.
            yunhui Yunhui added a comment -

            Hi, xintongsong,

            Yes, FairScheduler is used by Yarn.

            Thank you for your response.

            yunhui Yunhui added a comment - Hi, xintongsong , Yes, FairScheduler is used by Yarn. Thank you for your response.
            xtsong Xintong Song added a comment -

            Thanks for the confirmation, yunhui.
            I'm closing this ticket since it duplicates FLINK-19151.

            xtsong Xintong Song added a comment - Thanks for the confirmation, yunhui . I'm closing this ticket since it duplicates FLINK-19151 .

            People

              Unassigned Unassigned
              yunhui Yunhui
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: