Details
Description
After the 2.1 release, if the broker hasn't been upgrade to the latest inter-broker protocol version,
the committed offsets stored in the __consumer_offset topic will get cleaned up way earlier than it should be when the offsets are loaded back from the __consumer_offset topic in GroupCoordinator, which will happen during leadership transition or after broker bounce.
TL;DR
For V1 on-disk format for __consumer_offsets, we have the expireTimestamp field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior to KIP-211) for a kafka 2.1 broker, the logic of getting the expired offsets looks like:
def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = { offsets.filter { case (topicPartition, commitRecordMetadataAndOffset) => ... && { commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match { case None => // current version with no per partition retention currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs case Some(expireTimestamp) => // older versions with explicit expire_timestamp field => old expiration semantics is used currentTimestamp >= expireTimestamp } } }.... }
The expireTimestamp in the on-disk offset record can only be set when storing the committed offset in the __consumer_offset topic. But the GroupCoordinator also has keep a in-memory representation for the expireTimestamp (see the codes above), which can be set in the following two cases:
- Upon the GroupCoordinator receiving OffsetCommitRequest, the expireTimestamp is set using the following logic:
expireTimestamp = offsetCommitRequest.retentionTime match { case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None case retentionTime => Some(currentTimestamp + retentionTime) }
In all the latest client versions, the consumer will set out OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will always be None in this case. This means any committed offset set in this case will always hit the "case None" in the "getExpiredOffsets(...)" when coordinator is doing the cleanup, which is correct.
- Upon the GroupCoordinatorReceiving loading the committed offset stored in the __consumer_offsets topic from disk, the expireTimestamp is set using the following logic if IBP<2.1:
val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
and the logic to persist the expireTimestamp is:
// OffsetCommitRequest.DEFAULT_TIMESTAMP = -1 value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
Since the in-memory expireTimestamp will always be None in our case as mentioned in 1), we will always store -1 on-disk. Therefore, when the offset is loaded from the __consumer_offsets topic, the in-memory expireTimestamp will always be set to -1. This means any committed offset set in this case will always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" when coordinator is doing the cleanup, which basically indicates we will always expire the committed offset on the first expiration check (which is shortly after they are loaded from __consumer_offsets topic).
I am able to reproduce this bug on my local box with one broker using 2.,1. and 0.11.* consumer. The consumer will see null committed offset after the broker is bounced.
This bug is introduced by PR-5690 in the kafka 2.1 release and the fix is very straight-forward, which is basically set the expireTimestamp to None if it is -1 in the on-disk format.
Attachments
Issue Links
- links to