Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Won't Fix
-
None
-
None
-
None
Description
Running the PageRank example in Flink, I accidentally tried collect()-ing 125MB of data using accumulators to my client.
From a user's perspective, my job failed with this exception:
2020-11-18 12:56:06,897 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink (collect()) (3/4) (a1389a18cccabe10339099064032d6b8) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@6af1e09b. org.apache.flink.util.FlinkException: Execution a1389a18cccabe10339099064032d6b8 is unexpectedly no longer running on task executor 192.168.1.25:56111-928d60. at org.apache.flink.runtime.jobmaster.JobMaster$1.onMissingDeploymentsOf(JobMaster.java:248) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentReconciler.reconcileExecutionDeployments(DefaultExecutionDeploymentReconciler.java:55) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.reportPayload(JobMaster.java:1248) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.reportPayload(JobMaster.java:1235) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.receiveHeartbeat(HeartbeatManagerImpl.java:199) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.jobmaster.JobMaster.heartbeatFromTaskManager(JobMaster.java:686) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_222] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_222]
The root cause for this problem is the following exception on the TaskManager:
2020-11-18 12:56:05,972 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor [] - Caught exception while executing runnable in main thread. java.lang.reflect.UndeclaredThrowableException: null at com.sun.proxy.$Proxy25.updateTaskExecutionState(Unknown Source) ~[?:?] at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1563) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1593) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:174) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1925) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12.0.jar:1.12.0] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12.0.jar:1.12.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.0.jar:1.12.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.0.jar:1.12.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.0.jar:1.12.0] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.0.jar:1.12.0] Caused by: java.io.IOException: The rpc invocation size 125349547 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 26 more
It seems that the purely coordination-related updateTaskExecutionState call doesn't go through because some user data is piggybacking this call.
I see the following options solving this:
a) Don't use the coordination RPC system for transferring user payload
b) use separate RPC calls for transferring user payload
c) limit the size of accumulators that can be attached to the TaskExecutionState