Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18220

Improve exception if finalizeOnmaster hits OOM

    XMLWordPrintableJSON

Details

    Description

      When an OutputFormat implements FinalizeOnMaster and causes on OOM on the Master, then the exception is not enriched with additional pointers to the relevant config options.

      org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:208)
      	at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:58)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.failGlobal(ExecutionGraph.java:1042)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1291)
      	at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:870)
      	at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1125)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1491)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1464)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:497)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.Exception: Failed to finalize execution on master
      	... 33 more
      Caused by: java.lang.OutOfMemoryError: Java heap space
      	at org.apache.flink.examples.java.wordcount.WordCount.createOOM(WordCount.java:159)
      	at org.apache.flink.examples.java.wordcount.WordCount.access$000(WordCount.java:53)
      	at org.apache.flink.examples.java.wordcount.WordCount$MyOutputFormat.finalizeGlobal(WordCount.java:107)
      	at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
      	at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:870)
      	at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1125)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1491)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1464)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:497)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$$Lambda$158/0x0000000840388440.apply(Unknown Source)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      

      Attachments

        Issue Links

          Activity

            People

              mapohl Matthias Pohl
              chesnay Chesnay Schepler
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: