Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12464

Enhance constrained sticky Assign algorithm

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.7.0
    • 3.0.0
    • consumer

    Description

      In KAFKA-9987, we did a great improvement for the case when all consumers were subscribed to same set of topics. The algorithm contains 4 phases:

      1. Reassign as many previously owned partitions as possible, up to the maxQuota
      2. Fill remaining members up to minQuota
      3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions from the over-full consumers at max capacity
      4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we should just distribute one partition each to all consumers at min capacity

       

      Take an example for better understanding:

      example:

      Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... t1p9

      Suppose, current assignment is:

      C0: t1p0, t1p1, t1p2, t1p3, t1p4
      C1: t1p5, t1p6, t1p7, t1p8, t1p9

      Now, new consumer added: C2, so we'll do:

      1. Reassign as many previously owned partitions as possible, up to the maxQuota
        After this phase, the assignment will be: (maxQuota will be 4)

      C0: t1p0, t1p1, t1p2, t1p3
      C1: t1p5, t1p6, t1p7, t1p8

      1. Fill remaining members up to minQuota
        After this phase, the assignment will be:

      C0: t1p0, t1p1, t1p2, t1p3
      C1: t1p5, t1p6, t1p7, t1p8
      C2: t1p4, t1p9

      1. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions from the over-full consumers at max capacity
        After this phase, the assignment will be:

      C0: t1p0, t1p1, t1p2
      C1: t1p5, t1p6, t1p7, t1p8
      C2: t1p4, t1p9, t1p3

      1. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we should just distribute one partition each to all consumers at min capacity

       

       

      As we can see, we need 3 phases to complete the assignment. But we can actually completed with 2 phases. Here's the updated algorithm:

      1. Reassign as many previously owned partitions as possible, up to the maxQuota, and also considering the numMaxQuota by the remainder of (Partitions / Consumers)
      2. Fill remaining members up to maxQuota if current maxQuotaMember < numMaxQuota, otherwise, to minQuota

       

      By considering the numMaxQuota, the original step 1 won't be too aggressive to assign too many partitions to consumers, and the step 2 won't be too conservative to assign not enough partitions to consumers, so that we don't need step 3 and step 4 to balance them.

       

      So, the updated Pseudo-code sketch of the algorithm:

      C_f := (P/N)_floor, the floor capacity
      C_c := (P/N)_ceil, the ceiling capacity

      C_r := (P%N) the allowed number of members with C_c partitions assigned
      num_max_capacity_members := current number of members with C_c partitions assigned (default to 0)

      members := the sorted set of all consumers
      partitions := the set of all partitions
      unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions
      unfilled_members := the set of consumers not yet at capacity, initialized to empty
      max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty
      member.owned_partitions := the set of previously owned partitions encoded in the Subscription

      // Reassign as many previously owned partitions as possible, by considering the num_max_capacity_members
      for member : members
           remove any partitions that are no longer in the subscription from its owned partitions
           remove all owned_partitions if the generation is old
           if member.owned_partitions.size < C_f
               assign all owned partitions to member and remove from unassigned_partitions
               add member to unfilled_members
           else if member.owned_partitions.size == C_f
               assign first C_f owned_partitions to member and remove from unassigned_partitions
           else if member.owned_partitions.size >= C_c && num_max_capacity_members < C_r
               assign first C_c owned_partitions to member and remove from unassigned_partitions            

               num_max_capacity_members++
               a-dd member to max_capacity_members-

           else
                assign first C_f owned_partitions to member and remove from unassigned_partitions

      sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 .... (for data parallelism)
      sort unfilled_members by memberId (for determinism)

      // Fill remaining members up to the C_r numbers of C_c, otherwise, to C_f
      for member : unfilled_members
           compute the remaining capacity as C = C_f - num_assigned_partitions

               if num_max_capacity_members < C_r:

                      C = C_c - num_assigned_partitions

                      num_max_capacity_members++
               else

                     C = C_f - num_assigned_partitions
      pop the first C partitions from unassigned_partitions and assign to member

      // Steal partitions from members with max_capacity if necessary
      if we run out of partitions before getting to the end of unfilled members:
      for member : unfilled_members
            poll for first member in max_capacity_members and remove one partition
             assign this partition to the unfilled member

      // Distribute remaining partitions, one per consumer, to fill some up to C_c if necessary
      if we run out of unfilled_members before assigning all partitions:
      for partition : unassigned_partitions
              assign to next member in members that is not in max_capacity_members (then add member to max_capacity_members)

       

       

      C_f := (P/N)_floor, the floor capacity
      C_c := (P/N)_ceil, the ceiling capacity
      C_r := (P%N) the allowed number of members with C_c partitions assigned
      num_max_capacity_members := current number of members with C_c partitions assigned (default to 0)
        
       members := the sorted set of all consumers
       partitions := the set of all partitions
       unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions
       unfilled_members := the set of consumers not yet at capacity, initialized to empty
       member.owned_partitions := the set of previously owned partitions encoded in the Subscription
        
       // Reassign as many previously owned partitions as possible, by considering the num_max_capacity_members 
       for member : members
               remove any partitions that are no longer in the subscription from its owned partitions
               remove all owned_partitions if the generation is old
               if member.owned_partitions.size < C_f
                   assign all owned partitions to member and remove from unassigned_partitions
                   add member to unfilled_members
               else if member.owned_partitions.size >= C_c && num_max_capacity_members < C_r
                   assign first C_c owned_partitions to member and remove from unassigned_partitions
                   num_max_capacity_members++
               else 
                   assign first C_f owned_partitions to member and remove from unassigned_partitions
        
       sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 .... (for data parallelism)
       sort unfilled_members by memberId (for determinism)
        
       // Fill remaining members up to the C_r numbers of C_c, otherwise, to C_f
       for member : unfilled_members
           compute the remaining capacity as:
            if num_max_capacity_members < C_r: 
                C = C_c - num_assigned_partitions
                num_max_capacity_members++
            else 
                C = C_f - num_assigned_partitions
           pop the first C partitions from unassigned_partitions and assign to member

       

       So, adopting the updated algorithm, the previous example will be:

      example:

      Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... t1p9

      Suppose, current assignment is:

      C0: t1p0, t1p1, t1p2, t1p3, t1p4
      C1: t1p5, t1p6, t1p7, t1p8, t1p9

      Now, new consumer added: C2, so we'll do:

      1. Reassign as many previously owned partitions as possible, up to the maxQuota
        After this phase, the assignment will be: (maxQuota will be 4, and numAllowedWithMaxQuota will be 1)

      C0: t1p0, t1p1, t1p2, t1p3
      C1: t1p5, t1p6, t1p7

      1. Fill remaining members up to maxQuota if current maxQuotaMember < numMaxQuota, otherwise, to minQuota
        After this phase, the assignment will be:

      C0: t1p0, t1p1, t1p2, t1p3
      C1: t1p5, t1p6, t1p7
      C2: t1p4, t1p8, t1p9

       

      Another enhancement:

      Currently, in phase 1, we loop through all consumerToOwnedPartitions 1 by 1, to do partition assignment or partition remove. However, we doesn't care each partition info, what we only care, is the partition number assigned. So, we can just use List.subList() to get the expected number of subList from the consumerToOwnedPartitions. It'll also improve the performance of the algorithm.

       

      That is:

      // Reassign as many previously owned partitions as possible
      for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
          String consumer = consumerEntry.getKey();
          List<TopicPartition> ownedPartitions = consumerEntry.getValue();
          List<TopicPartition> consumerAssignment = assignment.get(consumer);
          int i = 0;
          // assign the first N partitions up to the max quota, and mark the remaining as being revoked
          // originally, we loop through the partitions 1 by 1
      /*   for (TopicPartition tp : ownedPartitions) {
              if (i < maxQuota) {
                  consumerAssignment.add(tp);
                  unassignedPartitions.remove(tp);
              } else {
                  allRevokedPartitions.add(tp);
              }
              ++i;
          } */
          // Enhancement: since we only care the number of partitions assigned, we can use subList to assign the expected number, so no need to loop through them all
          if (ownedPartitions.size() < minQuota) {
            // the expected assignment size is more than consumer have now, so keep all the owned partitions
            // and put this member into unfilled member list
            consumerAssignment.addAll(ownedPartitions);
            unassignedPartitions.removeAll(ownedPartitions);
            unfilledMembers.add(consumer);
        } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
            // consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members
            // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
            consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota));
            unassignedPartitions.removeAll(ownedPartitions.subList(0, maxQuota));
            allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size()));
        } else {
            // consumer owned the "minQuota" of partitions or more
            // so keep "minQuota" of the owned partitions, and revoke the rest of the partitions
            consumerAssignment.addAll(ownedPartitions.subList(0, minQuota));
          unassignedPartitions.removeAll(ownedPartitions.subList(0, minQuota));
            allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
        }
      
           .....
      

       

      Attachments

        Activity

          People

            showuon Luke Chen
            showuon Luke Chen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: