Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17921

RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected akka.timeout

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • 1.8.1, 1.10.1
    • None
    • None

    Description

      As described in summary, RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout.  But that's not the message what we want.

      If org.apache.flink.api.common.functions.AggregateFunction#getResult return null and is used in RpcGlobalAggregateManager#updateGlobalAggregate , this would cause the following exception, this is not expected to happen from there. If we increase the akka.ask.timeout to another value, exception is still in there.

      java.io.IOException: Error updating global aggregate.
      at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
      at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.util.concurrent.ExecutionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
      at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
      ... 8 more
      Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
      at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
      at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
      at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
      at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
      at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
      at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
      at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
      at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
      at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
      at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
      at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
      

      The following stacktrace would describe the root cause. We can see that CompletableFuture.waitingGet is the key point, it imply that the Completabilefuture will give the current thread to waiting, which will lead to the timeout of the akka communication of Flink. Therefore, even if the timeout is 1 hour, the problem cannot be solved.

      java.lang.Thread.State: WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)
      - parking to wait for <0x00000007b76617a8> (a java.util.concurrent.CompletableFuture$Signaller)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
      at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      

      We would make the RpcGlobalAggregateManager#updateGlobalAggregate use get(long timeout, TimeUnit unit) is a good choose. In that, The timeout information can truly reflect the current status of the program, akka.time.out error is too wide, which is not conducive to user troubleshooting.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mingleizhang zhangminglei
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: