Description
In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect). However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance.
Let's take a look at the example in the KIP-415:
It works well for most cases. But what if W4 added after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here):
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [] // one more member joined W4 joins with assignment: [] W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, but we didn't revoke any more C/T in this round, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W2(delay: 0, assigned: [BT4, BT5], revoked: [])
Because we didn't allow to do consecutive revoke in two consecutive rebalances (under the same leader), we will have this uneven distribution under this situation. We should allow consecutive rebalance to have another round of revocation to revoke the C/T to the other members in this case.
expected:
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [] // one more member joined W4 joins with assignment: [] W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, **and also revoke some C/T** W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W4(delay: 0, assigned: [BT4, BT5], revoked: []) // another round of rebalance to assign the new revoked C/T to the other members W1 rejoins with assignment: [AC0, AT1, AT2] Rebalance is triggered W2 joins with assignment: [AT4, AT5, BC0] W3 joins with assignment: [BT1, BT2, BT4] W4 joins with assignment: [BT4, BT5] W1 becomes leader W1 computes and sends assignments: // (final) We assigned all the previous revoked Connectors/Tasks to the members W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round.
Note2: this issue makes KAFKA-12283 test flaky.
Attachments
Attachments
Issue Links
- blocks
-
KAFKA-12283 Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
- Resolved
- is related to
-
KAFKA-8391 Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
- Resolved
-
KAFKA-13763 Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
- Resolved
-
KAFKA-13764 Potential improvements for Connect incremental rebalancing logic
- Resolved
- links to