Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
2.4.0
-
None
-
None
Description
Our current implementation ordering of KafkaConsumer.unsubscribe is the following:
this.subscriptions.unsubscribe(); this.coordinator.onLeavePrepare(); this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
And inside onLeavePrepare we would look into the assignment and try to revoke them and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment.
However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe(); which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases.
Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now.
I think we can summarize our fix as:
• Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks are all closed as revoked by then.
• [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside `onLeavePrepare`.