Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27762

Kafka WakeupException during handling splits changes

    XMLWordPrintableJSON

Details

    Description

       

      We enable dynamic partition discovery in our flink job, but job failed when kafka partition is changed. 

      Exception detail is shown as follows:

      java.lang.RuntimeException: One or more fetchers have encountered exception
      	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
      	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
      	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
      	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
      	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
      	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
      	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
      	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	... 1 more
      Caused by: org.apache.kafka.common.errors.WakeupException
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
      	at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
      	at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
      	at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
      	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
      	... 6 more 

       

      After preliminary investigation, according to source code of KafkaSource,

      At first: 

      method org.apache.kafka.clients.consumer.KafkaConsumer.wakeup() will be called if consumer is polling data.

      Later: 

      method org.apache.kafka.clients.consumer.KafkaConsumer.position() will be called during handle splits changes.

      Since consumer has been waken up, it will throw WakeUpException.

      Attachments

        Activity

          People

            renqs Qingsheng Ren
            zoucan zoucan
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: