Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14639

Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.2.1
    • 3.5.0, 3.4.1
    • clients, consumer
    • None

    Description

      I have an application that runs 6 consumers in parallel. I am getting some unexpected results when I use CooperativeStickyAssignor. If I understand the mechanism correctly, if the consumer looses partition in one rebalance cycle, the partition should be assigned in the next rebalance cycle.

      This assumption is based on the RebalanceProtocol documentation and few blog posts that describe the protocol, like this one on Confluent blog.

      The assignor should not reassign any owned partitions immediately, but instead may indicate consumers the need for partition revocation so that the revoked partitions can be reassigned to other consumers in the next rebalance event. This is designed for sticky assignment logic which attempts to minimize partition reassignment with cooperative adjustments.

      Any member that revoked partitions then rejoins the group, triggering a second rebalance so that its revoked partitions can be assigned. Until then, these partitions are unowned and unassigned.

      These are the logs from the application that uses protocol='cooperative-sticky'. In the same rebalance cycle (generationId=640partition 74 moves from consumer-3 to consumer-4. I omitted the lines that are logged by the other 4 consumers.

      Mind that the log is in reverse(bottom to top)

      2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New partition assignment: partition-59, seek to min common offset: 85120524
      2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions [partition-59] assigned successfully
      2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions assigned: [partition-59]
      2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Adding newly assigned partitions: partition-59
      2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Notifying assignor about the new Assignment(partitions=[partition-59])
      2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Request joining group due to: need to revoke partitions [partition-26, partition-74] as indicated by the current assignment and re-join
      2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions [partition-26, partition-74] revoked successfully
      2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished removing partition data
      2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] (Re-)joining group
      2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New partition assignment: partition-74, seek to min common offset: 107317730
      2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions [partition-74] assigned successfully
      2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions assigned: [partition-74]
      2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Adding newly assigned partitions: partition-74
      2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Notifying assignor about the new Assignment(partitions=[partition-74])
      2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Request joining group due to: need to revoke partitions [partition-57] as indicated by the current assignment and re-join
      2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions [partition-57] revoked successfully
      2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Finished removing partition data
      2022-12-14 11:18:22 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions revoked: [partition-26, partition-74]
      2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Revoke previously assigned partitions partition-26, partition-74
      2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Updating assignment with\n\tAssigned partitions: [partition-59]\n\tCurrent owned partitions: [partition-26, partition-74]\n\tAdded partitions (assigned - owned): [partition-59]\n\tRevoked partitions (owned - assigned): [partition-26, partition-74]
      2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Successfully synced group in generation Generation{generationId=640, memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af', protocol='cooperative-sticky'}
      2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-3-my-client-id-my-group-id, groupId=my-group-id] Successfully joined group with generation Generation{generationId=640, memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af', protocol='cooperative-sticky'}
      2022-12-14 11:18:22 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions revoked: [partition-57]
      2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Revoke previously assigned partitions partition-57
      2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Updating assignment with\n\tAssigned partitions: [partition-74]\n\tCurrent owned partitions: [partition-57]\n\tAdded partitions (assigned - owned): [partition-74]\n\tRevoked partitions (owned - assigned): [partition-57]
      2022-12-14 11:18:21 1 — [id-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Successfully synced group in generation Generation{generationId=640, memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477', protocol='cooperative-sticky'}
      2022-12-14 11:18:21 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] Successfully joined group with generation Generation{generationId=640, memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477', protocol='cooperative-sticky'} 

      Is this expected?

      Kafka client version is 3.2.1.

      Attachments

        1. consumers-jira.log
          47 kB
          Bojan Blagojevic

        Activity

          People

            pnee Philip Nee
            bojanblagojevic Bojan Blagojevic
            David Jacot David Jacot
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: