Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19401

Job stuck in restart loop due to excessive checkpoint recoveries which block the JobMaster

    XMLWordPrintableJSON

Details

    Description

      Flink job sometimes got into a restart loop for many hours and can't recover until redeployed. We had some issue with Kafka that initially caused the job to restart.

      Below is the first of the many exceptions for "ResourceManagerException: Could not find registered job manager" error.

      2020-09-19 00:03:31,614 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [flink-akka.actor.default-dispatcher-35973]  - Requesting new slot [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP
      rofile{UNKNOWN} from resource manager.
      2020-09-19 00:03:31,615 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [flink-akka.actor.default-dispatcher-35973]  - Requesting new slot [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile ResourceProfile{UNKNOWN} from resource manager.
      2020-09-19 00:03:31,615 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [flink-akka.actor.default-dispatcher-35973]  - Requesting new slot [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile ResourceProfile{UNKNOWN} from resource manager.
      2020-09-19 00:03:31,615 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [flink-akka.actor.default-dispatcher-35973]  - Requesting new slot [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile ResourceProfile{UNKNOWN} from resource manager.
      2020-09-19 00:03:31,620 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [flink-akka.actor.default-dispatcher-35973]  - Source: k2-csevpc -> k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED to FAILED.
      java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
              at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433)
              at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
              at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
              at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
              at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
              at org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065)
              at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
              at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)
              at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)
              at org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063)
              at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155)
              at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511)
              at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311)
              at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:160)
              at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlotInternal(SchedulerImpl.java:143)
              at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlot(SchedulerImpl.java:113)
              at org.apache.flink.runtime.executiongraph.SlotProviderStrategy$NormalSlotProviderStrategy.allocateSlot(SlotProviderStrategy.java:115)
              at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator.lambda$allocateSlotsFor$0(DefaultExecutionSlotAllocator.java:104)
              at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
              at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
              at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator.allocateSlotsFor(DefaultExecutionSlotAllocator.java:102)
              at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlots(DefaultScheduler.java:342)
              at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlotsAndDeploy(DefaultScheduler.java:311)
              at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy.allocateSlotsAndDeploy(EagerSchedulingStrategy.java:76)
              at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy.restartTasks(EagerSchedulingStrategy.java:57)
              at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$1(DefaultScheduler.java:268)
              at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
              at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
              at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
              at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
              at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
              at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
              at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
              at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
              at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
              at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
              at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
              at akka.actor.ActorCell.invoke(ActorCell.scala:561)
              at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
              at akka.dispatch.Mailbox.run(Mailbox.scala:225)
              at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
              at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
              at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
              at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
              at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
              at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
              at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
              at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
              at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
              at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
              at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestNewAllocatedSlot(SlotPoolImpl.java:438)
              at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.requestNewAllocatedSlot(SchedulerImpl.java:236)
              at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:506)
              ... 39 more
      Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
              at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.slotRequestToResourceManagerFailed(SlotPoolImpl.java:360)
              at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:348)
              at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
              at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)
              at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)
              at org.apache.flink.runtime.concurrent.FutureUtils.whenCompleteAsyncIfNotDone(FutureUtils.java:941)
              at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestSlotFromResourceManager(SlotPoolImpl.java:342)
              at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestNewAllocatedSlotInternal(SlotPoolImpl.java:309)
              at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestNewAllocatedSlot(SlotPoolImpl.java:437)
              ... 41 more
      Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not find registered job manager for job 70216adbeed914b35d77717c4b7b13ea.
              at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
              at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
              at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
              at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
              at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
              at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:214)
              at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
              at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
              at com.sun.proxy.$Proxy94.requestSlot(Unknown Source)
              at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.requestSlotFromResourceManager(SlotPoolImpl.java:337)
              ... 43 more
      Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not find registered job manager for job 70216adbeed914b35d77717c4b7b13ea.
              at org.apache.flink.runtime.resourcemanager.ResourceManager.requestSlot(ResourceManager.java:443)
              at sun.reflect.GeneratedMethodAccessor135.invoke(Unknown Source)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
              ... 20 more
      

      Grepped through our log. job manager registration happened once when the job was deployed a few days ago.

      2020-09-16 17:23:28,081 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-60]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      

      Then there were a flurry of 9 registrations in the same milli-seconds that happened ~30 mins after the first error of " Could not find registered job manager". The issue persisted many hours after this. Because this is a pre-prod job, so we didn't have alert on it.

      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,827 INFO  com.netflix.spaas.runtime.resourcemanager.TitusResourceManager [flink-akka.actor.default-dispatcher-35963]  - Registered job manager a7263fdf0cd75d4d6481858f89894876@akka.tcp://flink@100.118.253.133:43917/user/jobmanager_0 for job 70216adbeed914b35d77717c4b7b13ea.
      2020-09-19 00:33:07,828 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [flink-akka.actor.default-dispatcher-35968]  - JobManager successfully registered at ResourceManager, leader id: bf239dac186bc8ba901a8702f4bb42e3.
      

      I have the job manager logs for the hour with INFO level that I can share offline if needed.
       

      Attachments

        Issue Links

          Activity

            People

              roman Roman Khachatryan
              stevenz3wu Steven Zhen Wu
              Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: