Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10721

Kafka discovery-loop exceptions may be swallowed

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.6.2
    • 1.6.4, 1.7.2, 1.8.0
    • Connectors / Kafka
    • None

    Description

      In FlinkKafkaConsumerBase run method on line 721(master branch), if kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw exception then finally execute cancel method, cancel method will execute kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute handover.close, then result in handover.pollNext throw ClosedException),then next code will not execute,especially discoveryLoopError not be throwed,so, real culprit exception will be Swallowed.
      failed log like this:

      org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
        at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
        at java.lang.Thread.run(Thread.java:745)
      

      Shoud we modify it as follows?

      try {
      				kafkaFetcher.runFetchLoop();
      			} catch (Exception e) {
      				// if discoveryLoopErrorRef not null ,we should throw real culprit exception
      				if (discoveryLoopErrorRef.get() != null){
      					throw new RuntimeException(discoveryLoopErrorRef.get());
      				} else {
      					throw e;
      				}
      			}
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            chesnay Chesnay Schepler
            zhaoshijie zzsmdfj
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment