Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
None
-
None
-
None
Description
The common pattern of the new consumer:
try { records = consumer.poll(); // process records } catch (WakeupException) { consumer.close() }
in which the close() can commit offsets. But since in the poll() call, we do the following order:
1) trigger client.poll().
2) possibly update consumed position if there are some data from fetch response.
3) before return the records, possibly trigger another client.poll()
And if wakeup exception is thrown in 3) it will lead to not-returned messages to be committed hence data loss.