Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16626

Prevent REST handler from being closed more than once

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      In Flink 1.10.0 release, job cancellation can be problematic, as users frequently experience java.util.concurrent.TimeoutException at the client side, because the REST endpoint closes pre-maturely before sending out the response, this is because the jobCancellationHandler is incorrectly reused and closed twice.
       
      When executing the following command to stop a flink job with yarn per-job mode, the client keeps retrying untill timeout (1 minutes)and exit with failure. But the job stops successfully.
       Command :

      flink cancel $jobId yid appId
      

       The exception on the client side is :

       

      2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      ...
      2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.
      2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      2020-03-17 12:33:14,077 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
      2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete.
      2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command.
      org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a.
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
      at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
      at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
      at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
      at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
      at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
      Caused by: java.util.concurrent.TimeoutException
      at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
      ... 9 more

      ------------------------------------------------------------
      The program finished with the following exception:

      org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a.
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
      at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
      at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
      at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
      at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
      at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
      Caused by: java.util.concurrent.TimeoutException
      at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
      ... 9 more

      Actually, the job was cancelled. But the server also prints some exception:

       

      2020-03-17 12:25:13,754 ERROR [flink-akka.actor.default-dispatcher-17] org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:766) - Failed to submit a listener notification task. Event loop shut down? java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:340) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:333) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:766) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:764) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:421) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:149) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:224) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:176) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:91) at org.apache.flink.runtime.rest.handler.AbstractRestHandler.lambda$respondToRequest$0(AbstractRestHandler.java:78) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kyledong Weike Dong
            chaiyq chaiyongqiang
            Votes:
            0 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m

                Slack

                  Issue deployment