Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
One of the main improvements introduced by the cooperative protocol was the ability to continue processing records during a rebalance. In Streams, we take advantage of this by polling with a timeout of 0 when a rebalance is/has been in progress, so it can return immediately and continue on through the main loop to process new records. The main poll loop uses an algorithm based on the max.poll.interval.ms to ensure the StreamThread returns to call #poll in time to stay in the consumer group.
Generally speaking, it should exit the processing loop and invoke poll within a few minutes at most based on the poll interval, though typically it will break out much sooner once it's used up all the records from the last poll (based on the max.poll.records config which Streams sets to 1,000 by default). However, if doing heavy processing or setting a higher max.poll.records, the thread may continue processing for more than a few seconds. If it had sent out a JoinGroup request before going on to process and was waiting for its JoinGroup response, then once it does return to invoke #poll it will process this response and send out a SyncGroup – but if the processing took too long, this SyncGroup may immediately fail with the REBALANCE_IN_PROGRESS error.
Essentially, while the thread was processing the group leader will itself be processing the JoinGroup subscriptions of all members and generating an assignment, then sending this back in its SyncGroup. This may take only a few seconds or less, and the group coordinator will not yet have noticed (or care) that one of the consumers hasn't sent a SyncGroup – it will just return the assigned partitions in the SyncGroup request of the members who have responded in time, and "complete" the rebalance in their eyes. But if the assignment involved moving any partitions from one consumer to another, then it will need to trigger a followup rebalance right away to finish assigning those partitions which were revoked in the previous rebalance. This is what causes a new rebalance to be kicked off just seconds after the first one began.
If the consumer that was stuck processing was among those who needed to revoke partitions, this can lead to repeating rebalances – since it fails the SyncGroup of the 1st rebalance it never receives the assignment for it and never knows to revoke those partitions, meaning it will rejoin for the new rebalance still claiming them among its ownedPartitions. When the assignor generates the same assignment for the 2nd rebalance, it will again see that some partitions need to be revoked and will therefore trigger yet another new rebalance after finishing the 2nd. This can go on for as long as the StreamThreads are struggling to finish the JoinGroup phase in time due to processing.
Note that the best workaround at the moment is probably to just set a lower max.poll.records to reduce the processing loop duration
Attachments
Issue Links
- relates to
-
KAFKA-14419 Failed SyncGroup leading to partitions lost due to processing during rebalances
- Open
- links to