Description
We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink Kafka connector (FLINK-29153). Here's the exception:
org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055) at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573) at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226)
As WakeupException is not listed in the JavaDoc of KafkaConsumer#commitAsync, Flink Kafka connector doesn't catch the exception thrown directly from KafkaConsumer#commitAsync but handles all exceptions in the callback.
I checked the source code and suspect this is caused by KAFKA-13563. Also we never had this exception in commitAsync when we used Kafka client 2.4.1 & 2.8.1.
I'm wondering if this is kind of breaking the public API as the WakeupException is not listed in JavaDoc, and maybe it's better to invoke the callback to handle the WakeupException instead of throwing it directly from the method itself.