Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9885

Evict last members of a group when the maximum allowed is reached

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.6.0
    • None
    • None

    Description

      While analysing https://issues.apache.org/jira/browse/KAFKA-7965, we found that multiple members of a group can be evicted from a group if the leader of the consumer offset partition changes before the group is persisted. This happens because the current evection logic always evict the first member which rejoins the group.

      We would like to change the evection logic so that the last members to rejoin the group are kicked out instead.

      Here is an example of what happens when the leader changes:

      // Group is loaded in GroupCoordinator 0
      // A rebalance is triggered because the group is over capacity
      [2020-04-02 11:14:33,393] INFO [GroupMetadataManager brokerId=0] Scheduling loading of offsets and group metadata from __consumer_offsets-0 (kafka.coordinator.group.GroupMetadataManager:66)
      [2020-04-02 11:14:33,406] INFO [Consumer clientId=ConsumerTestConsumer, groupId=group-max-size-test] Discovered group coordinator localhost:40071 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:794)
      [2020-04-02 11:14:33,409] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
      [2020-04-02 11:14:33,410] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
      [2020-04-02 11:14:33,412] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
      [2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Loading group metadata for group-max-size-test with generation 1 (kafka.coordinator.group.GroupCoordinator:66)
      [2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Preparing to rebalance group group-max-size-test in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Freshly-loaded group is over capacity (GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a chance for consumers to commit offsets) (kafka.coordinator.group.GroupCoordinator:66)
      [2020-04-02 11:14:33,431] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-0 in 28 milliseconds, of which 0 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager:66)
      
      // A first consumer is kicked out of the group while trying to re-join
      [2020-04-02 11:14:33,449] ERROR [Consumer clientId=ConsumerTestConsumer, groupId=group-max-size-test] Attempt to join group failed due to fatal error: The consumer group has reached its max size. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
      [2020-04-02 11:14:33,451] ERROR [daemon-consumer-assignment-2]: Error due to (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
      org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group group-max-size-test already has the configured maximum number of members.
      [2020-04-02 11:14:33,451] INFO [daemon-consumer-assignment-2]: Stopped (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66)
      
      // Before the rebalance is completed, a preferred replica leader election kicks in and move the leader from 0 to 1
      [2020-04-02 11:14:34,155] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController:66)
      [2020-04-02 11:14:34,169] INFO [Controller id=0] Starting replica leader election (PREFERRED) for partitions group-max-size-test-0,group-max-size-test-3,__consumer_offsets-0 triggered by AutoTriggered (kafka.controller.KafkaController:66)
      
      // The group is loaded in GroupCoordinator 1 before completing the rebalance
      // Another rebalance is triggered because the group is still over capacity
      [2020-04-02 11:14:34,194] INFO [GroupMetadataManager brokerId=1] Scheduling loading of offsets and group metadata from __consumer_offsets-0 (kafka.coordinator.group.GroupMetadataManager:66)
      [2020-04-02 11:14:34,199] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
      [2020-04-02 11:14:34,199] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
      [2020-04-02 11:14:34,199] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
      [2020-04-02 11:14:34,201] INFO [GroupCoordinator 1]: Loading group metadata for group-max-size-test with generation 1 (kafka.coordinator.group.GroupCoordinator:66)
      [2020-04-02 11:14:34,202] INFO [GroupCoordinator 1]: Preparing to rebalance group group-max-size-test in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Freshly-loaded group is over capacity (GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a chance for consumers to commit offsets) (kafka.coordinator.group.GroupCoordinator:66)
      [2020-04-02 11:14:34,203] INFO [GroupMetadataManager brokerId=1] Finished loading offsets and group metadata from __consumer_offsets-0 in 9 milliseconds, of which 0 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager:66)
      
      // Prefered leader election is completed
      [2020-04-02 11:14:34,235] INFO [Controller id=0] Partition __consumer_offsets-0 completed preferred replica leader election. New leader is 1 (kafka.controller.KafkaController:66)
      
      // Group is unloaded from GroupCoordinator 0
      [2020-04-02 11:14:34,237] INFO [GroupMetadataManager brokerId=0] Scheduling unloading of offsets and group metadata from __consumer_offsets-0 (kafka.coordinator.group.GroupMetadataManager:66)
      [2020-04-02 11:14:34,237] INFO [GroupCoordinator 0]: Unloading group metadata for group-max-size-test with generation 1 (kafka.coordinator.group.GroupCoordinator:66)
      [2020-04-02 11:14:34,238] INFO [GroupMetadataManager brokerId=0] Finished unloading __consumer_offsets-0. Removed 0 cached offsets and 1 cached groups. (kafka.coordinator.group.GroupMetadataManager:66)
      
      // A second consumer is kicked out of the group while trying to re-join
      [2020-04-02 11:14:34,252] ERROR [Consumer clientId=ConsumerTestConsumer, groupId=group-max-size-test] Attempt to join group failed due to fatal error: The consumer group has reached its max size. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
      [2020-04-02 11:14:34,254] ERROR [daemon-consumer-assignment-1]: Error due to (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
      org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group group-max-size-test already has the configured maximum number of members.
      [2020-04-02 11:14:34,254] INFO [daemon-consumer-assignment-1]: Stopped (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66) 

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            dajac David Jacot
            dajac David Jacot
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment