Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16626

Prevent REST handler from being closed more than once

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      In Flink 1.10.0 release, job cancellation can be problematic, as users frequently experience java.util.concurrent.TimeoutException at the client side, because the REST endpoint closes pre-maturely before sending out the response, this is because the jobCancellationHandler is incorrectly reused and closed twice.
       
      When executing the following command to stop a flink job with yarn per-job mode, the client keeps retrying untill timeout (1 minutes)and exit with failure. But the job stops successfully.
       Command :

      flink cancel $jobId yid appId
      

       The exception on the client side is :

       

      2020-03-17 12:32:13,709 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      ...
      2020-03-17 12:33:11,065 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint.
      2020-03-17 12:33:14,070 DEBUG org.apache.flink.runtime.rest.RestClient - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to ocdt31.aicloud.local:51231/v1/jobs/cc61033484d4c0e7a27a8a2a36f4de7a?mode=cancel
      2020-03-17 12:33:14,077 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache - Freed 2 thread-local buffer(s) from thread: flink-rest-client-netty-thread-1
      2020-03-17 12:33:14,080 DEBUG org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete.
      2020-03-17 12:33:14,083 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command.
      org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a.
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
      at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
      at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
      at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
      at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
      at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
      Caused by: java.util.concurrent.TimeoutException
      at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
      ... 9 more

      ------------------------------------------------------------
      The program finished with the following exception:

      org.apache.flink.util.FlinkException: Could not cancel job cc61033484d4c0e7a27a8a2a36f4de7a.
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
      at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
      at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
      at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
      at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
      at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
      Caused by: java.util.concurrent.TimeoutException
      at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
      ... 9 more

      Actually, the job was cancelled. But the server also prints some exception:

       

      2020-03-17 12:25:13,754 ERROR [flink-akka.actor.default-dispatcher-17] org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:766) - Failed to submit a listener notification task. Event loop shut down? java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:340) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:333) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:766) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:764) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:421) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:149) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:224) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:176) at org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:91) at org.apache.flink.runtime.rest.handler.AbstractRestHandler.lambda$respondToRequest$0(AbstractRestHandler.java:78) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

        Attachments

        Issue Links

          Activity

            People

            • Assignee:
              kyledong Weike Dong
              Reporter:
              chaiyq chaiyongqiang

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m

                  Issue deployment