Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16626

Prevent REST handler from being closed more than once

    XMLWordPrintableJSON

Details

    Description

      In Flink 1.10.0 release, job cancellation can be problematic, as users frequently experience java.util.concurrent.TimeoutException at the client side, because the REST endpoint closes pre-maturely before sending out the response, this is because the jobCancellationHandler is incorrectly reused and closed twice.
       
      When executing the following command to stop a flink job with yarn per-job mode, the client keeps retrying untill timeout (1 minutes)and exit with failure. But the job stops successfully.
       Command :

      flink cancel $jobId yid appId
      

       The exception on the client side is :

       

      2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      ...
      2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.
      2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      2020-03-17 12:33:14,077 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
      2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete.
      2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command.
      org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a.
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
      at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
      at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
      at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
      at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
      at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
      Caused by: java.util.concurrent.TimeoutException
      at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
      ... 9 more

      ------------------------------------------------------------
      The program finished with the following exception:

      org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a.
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
      at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
      at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
      at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
      at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
      at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
      Caused by: java.util.concurrent.TimeoutException
      at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
      ... 9 more

      Actually, the job was cancelled. But the server also prints some exception:

       

      2020-03-17 12:25:13,754 ERROR [flink-akka.actor.default-dispatcher-17] org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:766) - Failed to submit a listener notification task. Event loop shut down? java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:340) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:333) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:766) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:764) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:421) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:149) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:224) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:176) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:91) at org.apache.flink.runtime.rest.handler.AbstractRestHandler.lambda$respondToRequest$0(AbstractRestHandler.java:78) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

      Attachments

        1. patch.diff
          12 kB
          Zili Chen
        2. jobmanager.log
          412 kB
          Zili Chen

        Issue Links

          Activity

            People

              kyledong Weike Dong
              chaiyq chaiyongqiang
              Votes:
              0 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m