Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.6.2
-
None
Description
In FlinkKafkaConsumerBase run method on line 721(master branch), if kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw exception then finally execute cancel method, cancel method will execute kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute handover.close, then result in handover.pollNext throw ClosedException),then next code will not execute,especially discoveryLoopError not be throwed,so, real culprit exception will be Swallowed.
failed log like this:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695) at java.lang.Thread.run(Thread.java:745)
Shoud we modify it as follows?
try { kafkaFetcher.runFetchLoop(); } catch (Exception e) { // if discoveryLoopErrorRef not null ,we should throw real culprit exception if (discoveryLoopErrorRef.get() != null){ throw new RuntimeException(discoveryLoopErrorRef.get()); } else { throw e; } }
Attachments
Issue Links
- Is contained by
-
FLINK-10774 connection leak when partition discovery is disabled and open throws exception
- Resolved