Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.7.0
-
None
Description
Issue is reproducible with simple CLI tool - https://gist.github.com/mlowicki/c3b942f5545faced93dc414e01a2da70
#!/usr/bin/env bash for i in {1..100} do kafka-committer --bootstrap "ADDR:9092" --topic "TOPIC" --group foo --metadata-min 6000 --metadata-max 10000 --partitions 72 --fetch done
What it does it that initially it fetches committed offsets and then tries to commit for multiple partitions. If some of commits have metadata over the allowed limit then:
1. I see errors about too large commits (expected)
2. Another run the tool fails at the stage of fetching commits with (this is the problem):
config: ClientConfig { conf_map: { "group.id": "bar", "bootstrap.servers": "ADDR:9092", }, log_level: Error, } fetching committed offsets.. Error: Meta data fetch error: OperationTimedOut (Local: Timed out) Caused by: OperationTimedOut (Local: Timed out)
On the Kafka side I see unstable_offset_commits errors reported by out internal metric which is derived from:
kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=X,error=Y
Increasing the timeout doesn't help and the only solution I've found is to trigger commits for all partitions with metadata below the limit or to use:
isolation.level=read_uncommitted
I don't know that code very well but https://github.com/apache/kafka/blob/3.7/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L492-L496 seems fishy:
if (isTxnOffsetCommit) { addProducerGroup(producerId, group.groupId) group.prepareTxnOffsetCommit(producerId, offsetMetadata) } else { group.prepareOffsetCommit(offsetMetadata) }
as it's using offsetMetadata and not filteredOffsetMetadata and I see that while removing those pending commits we use filtered offset metadata around https://github.com/apache/kafka/blob/3.7/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L397-L422
val responseError = group.inLock { if (status.error == Errors.NONE) { if (!group.is(Dead)) { filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => if (isTxnOffsetCommit) group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) else group.onOffsetCommitAppend(topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) } } // Record the number of offsets committed to the log offsetCommitsSensor.record(records.size) Errors.NONE } else { if (!group.is(Dead)) { if (!group.hasPendingOffsetCommitsFromProducer(producerId)) removeProducerGroup(producerId, group.groupId) filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => if (isTxnOffsetCommit) group.failPendingTxnOffsetCommit(producerId, topicIdPartition) else group.failPendingOffsetWrite(topicIdPartition, offsetAndMetadata) } }
so the problem might be related to not cleaning up the data structure with pending commits properly.