Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29153

KafkaConsumerThread should catch WakeupException when committing offsets

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      KafkaConsumerThread in the legacy FlinkKafkaConsumer makes a wakeup on the KafkaConsumer on offset commit to wakeup the potential blocking KafkaConsumer.poll(). However the wakeup might happen when the consumer is not polling. The wakeup will be remembered by the consumer and re-examined while committing the offset asynchronously, which leads to an unnecessary WakeupException.

      As the JavaDoc and method signature of KafkaConsumer does not show that the WakeupException could be thrown in KafkaConsumer#commitAsync, this could be considered as a bug on Kafka side (breaking the contract of public API), but we can still fix it by catching the WakeupException and retrying.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            renqs Qingsheng Ren
            renqs Qingsheng Ren
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment