Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
3.2.0
Description
Hi
In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue that consumer#poll(duration) will be returned after the provided duration. It's because if rebalance needed, we'll try to commit current offset first before rebalance synchronously. And if the offset committing takes too long, the consumer#poll will spend more time than provided duration. To fix that, we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
However, in this ticket, we found the async commit will keep sending a new commit request during each Consumer#poll, because the offset commit never completes in time. The impact is that the existing consumer will be kicked out of the group after rebalance timeout without joining the group. That is, suppose we have consumer A in group G, and now consumer B joined the group, after the rebalance, only consumer B in the group.
The workaround for this issue is to change the assignor back to eager assignors, ex: StickyAssignor, RoundRobinAssignor.
To fix the issue, we come out 2 solutions:
- we can explicitly wait for the async commit complete in onPrepareJoin, but that would let the
KAFKA-13310issue happen again. - 2.we can try to keep the async commit offset future currently inflight. So that we can make sure each Consumer#poll, we are waiting for the future completes
Besides, there's also another bug found during fixing this bug. Before KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when retriable error until timeout. After KAFKA-13310, we thought we have retry, but we'll retry after partitions revoking. That is, even though the retried offset commit successfully, it still causes some partitions offsets un-committed, and after rebalance, other consumers will consume overlapping records.
===
we didn't wait for client to receive commit offset response here, so onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and client will loop in invoking onJoinPrepare.
I think the EAGER mode don't have this problem because it will revoke the partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to commit next round.
reproduce:
- single node Kafka version 3.2.0 && client version 3.2.0
- topic1 have 5 partititons
- start a consumer1 (cooperative rebalance)
- start another consumer2 (same consumer group)
- consumer1 will hang for a long time before re-join
- from server log consumer1 rebalance timeout before joineGroup and re-join with another memberId
consume1's log keeps printing:
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 (ConsumerCoordinator.java:739)
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of offsets {topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} (ConsumerCoordinator.java:1143)
and coordinator's log:
[2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance group xxx in state PreparingRebalance with old generation 56 (__consumer_offsets-30) (reason: Adding new member consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic members who haven't joined: Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx generation 57 (__consumer_offsets-30) with 3 members (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group xxx in CompletingRebalance state. Created a new member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for generation 57. The group has 3 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance group xxx in state PreparingRebalance with old generation 57 (__consumer_offsets-30) (reason: Adding new member consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
Attachments
Issue Links
- is related to
-
KAFKA-16235 auto commit still causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION
- Open
- links to