Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14208

KafkaConsumer#commitAsync throws unexpected WakeupException

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 3.2.1
    • 3.3.0, 3.2.3
    • clients
    • None

    Description

      We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink Kafka connector (FLINK-29153). Here's the exception:

      org.apache.kafka.common.errors.WakeupException
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573)
      	at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226) 

      As WakeupException is not listed in the JavaDoc of KafkaConsumer#commitAsync, Flink Kafka connector doesn't catch the exception thrown directly from KafkaConsumer#commitAsync but handles all exceptions in the callback.

      I checked the source code and suspect this is caused by KAFKA-13563. Also we never had this exception in commitAsync when we used Kafka client 2.4.1 & 2.8.1. 

      I'm wondering if this is kind of breaking the public API as the WakeupException is not listed in JavaDoc, and maybe it's better to invoke the callback to handle the WakeupException instead of throwing it directly from the method itself. 

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            guozhang Guozhang Wang
            renqs Qingsheng Ren
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment