Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14024

Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

    XMLWordPrintableJSON

Details

    Description

      Hi 

      In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue that consumer#poll(duration) will be returned after the provided duration. It's because if rebalance needed, we'll try to commit current offset first before rebalance synchronously. And if the offset committing takes too long, the consumer#poll will spend more time than provided duration. To fix that, we change commit sync with commit async before rebalance (i.e. onPrepareJoin).

       

      However, in this ticket, we found the async commit will keep sending a new commit request during each Consumer#poll, because the offset commit never completes in time. The impact is that the existing consumer will be kicked out of the group after rebalance timeout without joining the group. That is, suppose we have consumer A in group G, and now consumer B joined the group, after the rebalance, only consumer B in the group.

       

      The workaround for this issue is to change the assignor back to eager assignors, ex: StickyAssignor, RoundRobinAssignor.

       

      To fix the issue, we come out 2 solutions:

      1. we can explicitly wait for the async commit complete in onPrepareJoin, but that would let the KAFKA-13310 issue happen again.
      2. 2.we can try to keep the async commit offset future currently inflight. So that we can make sure each Consumer#poll, we are waiting for the future completes

       

      Besides, there's also another bug found during fixing this bug. Before KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when retriable error until timeout. After KAFKA-13310, we thought we have retry, but we'll retry after partitions revoking. That is, even though the retried offset commit successfully, it still causes some partitions offsets un-committed, and after rebalance, other consumers will consume overlapping records.

       

       

      ===

      https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752

       

      we didn't wait for client to receive commit offset response here, so onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and client will loop in invoking onJoinPrepare.

      I think the EAGER mode don't have this problem because it will revoke the partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to commit next round.

      reproduce:

      • single node Kafka version 3.2.0 && client version 3.2.0
      • topic1 have 5 partititons
      • start a consumer1 (cooperative rebalance)
      • start another consumer2 (same consumer group)
      • consumer1 will hang for a long time before re-join
      • from server log consumer1 rebalance timeout before joineGroup and re-join with another memberId

      consume1's log keeps printing:

      16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 (ConsumerCoordinator.java:739)
      16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of offsets {topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} (ConsumerCoordinator.java:1143)

       

      and coordinator's log:

      [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance group xxx in state PreparingRebalance with old generation 56 (__consumer_offsets-30) (reason: Adding new member consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
      [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic members who haven't joined: Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) (kafka.coordinator.group.GroupCoordinator)
      [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx generation 57 (__consumer_offsets-30) with 3 members (kafka.coordinator.group.GroupCoordinator)
      [2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group xxx in CompletingRebalance state. Created a new member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
      [2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for generation 57. The group has 3 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
      [2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance group xxx in state PreparingRebalance with old generation 57 (__consumer_offsets-30) (reason: Adding new member consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)

      Attachments

        Issue Links

          Activity

            People

              aiquestion Shawn Wang
              aiquestion Shawn Wang
              Luke Chen Luke Chen
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: