Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
KafkaConsumer.wakeup() can be used to interrupt blocking operations (e.g. in order to shutdown), so wakeup exceptions must get propagated to the user. Currently, there are several locations in the code where a wakeup exception could be caught and silently discarded. For example, when the rebalance callback is invoked, we just catch and log all exceptions. In this case, we also need to be careful that wakeup exceptions do not affect rebalance callback semantics. In particular, it is possible currently for a wakeup to cause onPartitionsRevoked to be invoked multiple times.