Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13581

Error getting old protocol



    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 2.5.0
    • 2.5.0
    • core
    • None


      In this case,oldProtocols will always be the protocols,because knownStaticMember is updated before.So, I think oldProtocol should be assigned before updateMember.

      private def updateStaticMemberAndRebalance(group: GroupMetadata,
                                                 newMemberId: String,
                                                 groupInstanceId: Option[String],
                                                 protocols: List[(String, Array[Byte])],
                                                 responseCallback: JoinCallback): Unit = {
        val oldMemberId = group.getStaticMemberId(groupInstanceId)
        info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " +
          s"old member id $oldMemberId will be removed.")
        val currentLeader = group.leaderOrNull
        val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
        // Heartbeat of old member id will expire without effect since the group no longer contains that member id.
        // New heartbeat shall be scheduled with new member id.
        completeAndScheduleNextHeartbeatExpiration(group, member)
        val knownStaticMember = group.get(newMemberId)
        group.updateMember(knownStaticMember, protocols, responseCallback)
        val oldProtocols = knownStaticMember.supportedProtocols
        group.currentState match {
          case Stable =>
            // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the
            // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent
            val selectedProtocolOfNextGeneration = group.selectProtocol
            if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
              info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")
              val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap
              groupManager.storeGroup(group, groupAssignment, error => {
                if (error != Errors.NONE) {
                  warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")
                  // Failed to persist member.id of the given static member, revert the update of the static member in the group.
                  group.updateMember(knownStaticMember, oldProtocols, null)
                  val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId)
                  completeAndScheduleNextHeartbeatExpiration(group, oldMember)
                    memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
                    generationId = group.generationId,
                    protocolType = group.protocolType,
                    protocolName = group.protocolName,
                    leaderId = currentLeader,
                    error = error
                } else {
                  group.maybeInvokeJoinCallback(member, JoinGroupResult(
                    members = List.empty,
                    memberId = newMemberId,
                    generationId = group.generationId,
                    protocolType = group.protocolType,
                    protocolName = group.protocolName,
                    // We want to avoid current leader performing trivial assignment while the group
                    // is in stable stage, because the new assignment in leader's next sync call
                    // won't be broadcast by a stable group. This could be guaranteed by
                    // always returning the old leader id so that the current leader won't assume itself
                    // as a leader based on the returned message, since the new member.id won't match
                    // returned leader id, therefore no assignment will be performed.
                    leaderId = currentLeader,
                    error = Errors.NONE))
            } else {
              maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")
          case CompletingRebalance =>
            // if the group is in after-sync stage, upon getting a new join-group of a known static member
            // we should still trigger a new rebalance, since the old member may already be sent to the leader
            // for assignment, and hence when the assignment gets back there would be a mismatch of the old member id
            // with the new replaced member id. As a result the new member id would not get any assignment.
            prepareRebalance(group, s"Updating metadata for static member ${member.memberId} with instance id $groupInstanceId")
          case Empty | Dead =>
            throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " +
              s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.")
          case PreparingRebalance =>




            Unassigned Unassigned
            chenzy chenzhongyu
            0 Vote for this issue
            3 Start watching this issue