Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7018

persist memberId for consumer restart

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.4.0
    • consumer, streams
    • None

    Description

      In group coordinator, there is a logic to neglect join group request from existing follower consumers:

      case Empty | Stable =>
        if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
          // if the member id is unknown, register the member to the group
          addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
        } else {
          val member = group.get(memberId)
          if (group.isLeader(memberId) || !member.matches(protocols)) {
            // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
            // The latter allows the leader to trigger rebalances for changes affecting assignment
            // which do not affect the member metadata (such as topic metadata changes for the consumer)
            updateMemberAndRebalance(group, member, protocols, responseCallback)
          } else {
            // for followers with no actual change to their metadata, just return group information
            // for the current generation which will allow them to issue SyncGroup
            responseCallback(JoinGroupResult(
              members = Map.empty,
              memberId = memberId,
              generationId = group.generationId,
              subProtocol = group.protocolOrNull,
              leaderId = group.leaderOrNull,
              error = Errors.NONE))
          }
      

      While looking at the AbstractCoordinator, I found that the generation was hard-coded as 

      NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the first join group request. This means we will treat the restarted consumer as a new member, so the rebalance will be triggered until session timeout.

      I'm trying to clarify the following things before we extend the discussion:

      1. Whether my understanding of the above logic is right (Hope mjsax could help me double check)
      2. Whether it makes sense to persist last round of memberId for consumers? We currently only need this feature in stream application, but will do no harm if we also use it for consumer in general. This would be a nice-to-have feature on consumer restart when we configured the loading-previous-memberId to true. If we failed, simply use the UNKNOWN_MEMBER_ID
      3. The behavior could also be changed on the broker side, but I suspect it is very risky. So far client side change should be the least effort. The end goal is to avoid excessive rebalance from the same consumer restart, so if you feel server side change could also help, we could further discuss.

      Thank you for helping out! mjsax guozhang

       

      Attachments

        Issue Links

          Activity

            People

              bchen225242 Boyang Chen
              bchen225242 Boyang Chen
              Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: