Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12495

Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • None
    • 3.4.0
    • connect
    • None

    Description

      In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect). However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance.

       

      Let's take a look at the example in the KIP-415:

      It works well for most cases. But what if W4 added after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here):

       

      Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5])
      Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5
      W1 is current leader
      W2 joins with assignment: []
      Rebalance is triggered
      W3 joins while rebalance is still active with assignment: []
      W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]
      W1 becomes leader
      W1 computes and sends assignments:
      W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5])
      W2(delay: 0, assigned: [], revoked: [])
      W3(delay: 0, assigned: [], revoked: [])
      
      W1 stops revoked resources
      W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
      Rebalance is triggered
      W2 joins with assignment: []
      W3 joins with assignment: []
      
      // one more member joined
      W4 joins with assignment: []
      W1 becomes leader
      W1 computes and sends assignments:
      
      // We assigned all the previous revoked Connectors/Tasks to the new member, but we didn't revoke any more C/T in this round, which cause unbalanced distribution
      W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
      W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
      W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
      W2(delay: 0, assigned: [BT4, BT5], revoked: [])
      

      Because we didn't allow to do consecutive revoke in two consecutive rebalances (under the same leader), we will have this uneven distribution under this situation. We should allow consecutive rebalance to have another round of revocation to revoke the C/T to the other members in this case.

      expected:

      Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5])
      Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5
      W1 is current leader
      W2 joins with assignment: []
      Rebalance is triggered
      W3 joins while rebalance is still active with assignment: []
      W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]
      W1 becomes leader
      W1 computes and sends assignments:
      W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5])
      W2(delay: 0, assigned: [], revoked: [])
      W3(delay: 0, assigned: [], revoked: [])
      
      W1 stops revoked resources
      W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
      Rebalance is triggered
      W2 joins with assignment: []
      W3 joins with assignment: []
      
      // one more member joined
      W4 joins with assignment: []
      W1 becomes leader
      W1 computes and sends assignments:
      
      // We assigned all the previous revoked Connectors/Tasks to the new member, **and also revoke some C/T** 
      W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
      W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
      W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
      W4(delay: 0, assigned: [BT4, BT5], revoked: [])
      
      // another round of rebalance to assign the new revoked C/T to the other members
      W1 rejoins with assignment: [AC0, AT1, AT2] 
      Rebalance is triggered 
      W2 joins with assignment: [AT4, AT5, BC0] 
      W3 joins with assignment: [BT1, BT2, BT4]
      W4 joins with assignment: [BT4, BT5]
      
      W1 becomes leader 
      W1 computes and sends assignments:
      
      // (final) We assigned all the previous revoked Connectors/Tasks to the members
      W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) 
      W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) 
      W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) 
      W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
      

      Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round.

       

      Note2: this issue makes KAFKA-12283 test flaky.

      Attachments

        1. image-2021-03-18-15-07-27-103.png
          111 kB
          Luke Chen
        2. image-2021-03-18-15-05-52-557.png
          145 kB
          Luke Chen
        3. image-2021-03-18-15-04-57-854.png
          145 kB
          Luke Chen

        Issue Links

          Activity

            People

              sagarrao Sagar Rao
              showuon Luke Chen
              Votes:
              6 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: