Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27041

KafkaSource in batch mode failing if any topic partition is empty

    XMLWordPrintableJSON

Details

    • Important

    Description

      First let's take the case of consuming from a Kafka topic with a single partition having 0 messages. Execution in batch mode, with bounded offsets set to latest, is expected to finish gracefully. However, it fails with an exception.

      Consider this minimal working example (assume that test_topic exists with 1 partition and 0 messages):

      		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      		env.setRuntimeMode(RuntimeExecutionMode.BATCH);
      
      		KafkaSource<String> kafkaSource = KafkaSource
      				.<String>builder()
      				.setBootstrapServers("localhost:9092")
      				.setTopics("test_topic")
      				.setValueOnlyDeserializer(new SimpleStringSchema())
      				.setBounded(OffsetsInitializer.latest())
      				.build();
      
      		DataStream<String> stream = env.fromSource(
      				kafkaSource,
      				WatermarkStrategy.noWatermarks(),
      				"Kafka Source"
      		);
      		stream.print();
      
      		env.execute("Flink KafkaSource test job");
      

      This produces exception:

      Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        ... [omitted for readability]
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        ... [omitted for readability]
      Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
      	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
      	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
      	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
      	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
      	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
      	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
      	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
      	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	... 1 more
      Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
      	at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
      	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
      	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
      	... 6 more
      

      In our actual use case, we have a Kafka topic with many partitions. Some of them have no messages. This causes our batch job to fail.

      Attachments

        Issue Links

          Activity

            People

              renqs Qingsheng Ren
              terxor Terxor
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: