Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13435

Static membership protocol should let the leader skip assignment (KIP-814)

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      When using consumer groups with static membership, if the consumer marked as leader has restarted, then metadata changes such as partition increase are not triggering expected rebalances.

      To reproduce this issue, simply:

      1. Create a static consumer subscribed to a single topic
      2. Close the consumer and create a new one with the same group instance id
      3. Increase partitions for the topic
      4. Observe that no rebalance occurs and the new partitions are not assigned

      I have only tested this in 2.7, but it may apply to newer versions as well.

      Analysis

      In ConsumerCoordinator, one responsibility of the leader consumer is to track metadata and trigger a rebalance if there are changes such as new partitions added:

      https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793

      if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) {
          ...
          requestRejoinIfNecessary(reason);
          return true;
      }
      

      Note thatĀ assignmentSnapshot is currently only set if the consumer is the leader:

      https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353

      // Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
      if (!isLeader)
          assignmentSnapshot = null;
      

      And isLeader is only true after an assignment is performed during a rebalance:

      https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634

      That is, when a consumer group forms, exactly one consumer in the group should haveĀ isLeader == True and be responsible for triggering rebalances on metadata changes.

      However, in the case of static membership, if the leader has been restarted and rejoined the group, the group essentially no longer has a current leader. Even though the metadata changes are fetched, no rebalance will be triggered. That is, isLeader will be false for all members.

      This issue does not resolve until after an actual group change that causes a proper rebalance. In order to safely make a partition increase when using static membership, consumers must be stopped and have timed out, or forcibly removed with AdminClient.removeMembersFromConsumerGroup().

      Correcting this in the client probably also requires help from the broker. Currently, when a static consumer that is leader is restarted, the coordinator does recognize the change:

      e.g. leader bbfcb930-61a3-4d21-945c-85f4576490ff was restarted

      [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test with unknown member id rejoins, assigning new member id 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
      6ebf-47da-95ef-c54fef17ab74, while old member id 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff will be removed. (
      kafka.coordinator.group.GroupCoordinator)

      However, it does not attempt to update the leader id since this isn't a new rebalance, and JOIN_GROUP will continue returning the now stale member id as leader:

      2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0, groupId=ryan_test] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40, protocolType='consumer', protocolName='range', leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff', memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74', members=[])

      This means that it's not easy for any particular restarted member to identify that it should consider itself leader and handle metadata changes.

      There is reference to the difficulty of leader restarts in KAFKA-7728 but the focus seemed mainly on avoiding needless rebalances for static members. That goal was accomplished, but this issue seems to be a side effect of both not rebalancing AND not having the rejoined member reclaim its leadership status.

      Also, I have not verified if it's strictly related or valid, but noticed this ticket has been opened too: KAFKA-12759.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            dajac David Jacot
            rleslie Ryan Leslie
            Jason Gustafson Jason Gustafson
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment