Details
Description
The group coordinator might delete invalid offsets during a group rebalance. During a rebalance, the coordinator is relying on the last commit timestamp (offsetAndMetadata.commitTimestamp) instead of the last state modification timestamp (currentStateTimestamp) to detect expired offsets.
This is relatively easy to reproduce by playing with group.initial.rebalance.delay.ms, offset.retention.minutes and offset.check.retention.interval, I uploaded an example on: https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention .
This script does:
- Start a broker with: offset.retention.minute=2, offset.check.retention.interval.ms=1000, group.initial.rebalance.delay=20000
- Produced 10 messages
- Create a consumer group to consume 10 messages, and disable auto.commit to only commit a few times
- Wait 3 minutes, then the Consumer get a kill -9
- Restart the consumer after a few seconds
- The consumer restart from auto.offset.reset , the offset got removed
The cause is due to the GroupMetadata.scala:
- When the group get emptied, the subscribedTopics is set to Set.empty (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521)
- When the new member joins, we add the new member right away in the group ; BUT the subscribedTopics is only updated once the migration is over (in the initNewGeneration) (which could take a while due to the group.initial.rebalance.delay)
- When the log cleaner got executed, subscribedTopics.isDefined returns true as Set.empty != None (the underlying condition)
- Thus we enter https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785 with an empty subscribedTopics list and we are relying on the commitTimestamp regardless of the currentStateTimestamp
This seem to be a regression generated by KIP-496 https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges (KAFKA-8338, KAFKA-8370)
Attachments
Issue Links
- is caused by
-
KAFKA-8338 Improve consumer offset expiration logic to take subscription into account
- Resolved
-
KAFKA-8730 Add API to delete consumer offsets (KIP-496)
- Resolved
- links to