Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20220

DataSet.collect() uses TaskExecutionState for transferring user-payload

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Won't Fix
    • None
    • None
    • None

    Description

      Running the PageRank example in Flink, I accidentally tried collect()-ing 125MB of data using accumulators to my client.

      From a user's perspective, my job failed with this exception:

      2020-11-18 12:56:06,897 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - DataSink (collect()) (3/4) (a1389a18cccabe10339099064032d6b8) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@6af1e09b.
      org.apache.flink.util.FlinkException: Execution a1389a18cccabe10339099064032d6b8 is unexpectedly no longer running on task executor 192.168.1.25:56111-928d60.
              at org.apache.flink.runtime.jobmaster.JobMaster$1.onMissingDeploymentsOf(JobMaster.java:248) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentReconciler.reconcileExecutionDeployments(DefaultExecutionDeploymentReconciler.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.reportPayload(JobMaster.java:1248) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.reportPayload(JobMaster.java:1235) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.receiveHeartbeat(HeartbeatManagerImpl.java:199) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.jobmaster.JobMaster.heartbeatFromTaskManager(JobMaster.java:686) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) ~[?:?]
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_222]
              at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_222]
      

      The root cause for this problem is the following exception on the TaskManager:

      2020-11-18 12:56:05,972 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor               [] - Caught exception while executing runnable in main thread.
      java.lang.reflect.UndeclaredThrowableException: null
              at com.sun.proxy.$Proxy25.updateTaskExecutionState(Unknown Source) ~[?:?]
              at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1563) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1593) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:174) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1925) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.0.jar:1.12.0]
              at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.0.jar:1.12.0]
      Caused by: java.io.IOException: The rpc invocation size 125349547 exceeds the maximum akka framesize.
              at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
              ... 26 more
      

      It seems that the purely coordination-related updateTaskExecutionState call doesn't go through because some user data is piggybacking this call.

      I see the following options solving this:
      a) Don't use the coordination RPC system for transferring user payload
      b) use separate RPC calls for transferring user payload
      c) limit the size of accumulators that can be attached to the TaskExecutionState

      Attachments

        1. jobmanager-1.11.log
          461 kB
          Robert Metzger

        Activity

          People

            Unassigned Unassigned
            rmetzger Robert Metzger
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: