Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16371

Unstable committed offsets after triggering commits where metadata for some partitions are over the limit

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.7.0
    • 3.8.0, 3.7.1
    • offset manager
    • None

    Description

      Issue is reproducible with simple CLI tool - https://gist.github.com/mlowicki/c3b942f5545faced93dc414e01a2da70

      #!/usr/bin/env bash
      
      for i in {1..100}
      do
          kafka-committer --bootstrap "ADDR:9092" --topic "TOPIC" --group foo --metadata-min 6000 --metadata-max 10000 --partitions 72 --fetch
      done

      What it does it that initially it fetches committed offsets and then tries to commit for multiple partitions. If some of commits have metadata over the allowed limit then:
      1. I see errors about too large commits (expected)

      2. Another run the tool fails at the stage of fetching commits with (this is the problem):

      config: ClientConfig { conf_map: { "group.id": "bar", "bootstrap.servers": "ADDR:9092", }, log_level: Error, }
      fetching committed offsets..
      Error: Meta data fetch error: OperationTimedOut (Local: Timed out) Caused by: OperationTimedOut (Local: Timed out)

      On the Kafka side I see unstable_offset_commits errors reported by out internal metric which is derived from:

       kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=X,error=Y

      Increasing the timeout doesn't help and the only solution I've found is to trigger commits for all partitions with metadata below the limit or to use: 

      isolation.level=read_uncommitted

       

      I don't know that code very well but https://github.com/apache/kafka/blob/3.7/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L492-L496 seems fishy:

          if (isTxnOffsetCommit) {
            addProducerGroup(producerId, group.groupId)
            group.prepareTxnOffsetCommit(producerId, offsetMetadata)
          } else {
            group.prepareOffsetCommit(offsetMetadata)
          }

      as it's using offsetMetadata and not filteredOffsetMetadata and I see that while removing those pending commits we use filtered offset metadata around https://github.com/apache/kafka/blob/3.7/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L397-L422 

            val responseError = group.inLock {
              if (status.error == Errors.NONE) {
                if (!group.is(Dead)) {
                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
                    if (isTxnOffsetCommit)
                      group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
                    else
                      group.onOffsetCommitAppend(topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
                  }
                }
                // Record the number of offsets committed to the log
                offsetCommitsSensor.record(records.size)
                Errors.NONE
              } else {
                if (!group.is(Dead)) {
                  if (!group.hasPendingOffsetCommitsFromProducer(producerId))
                    removeProducerGroup(producerId, group.groupId)
                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
                    if (isTxnOffsetCommit)
                      group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
                    else
                      group.failPendingOffsetWrite(topicIdPartition, offsetAndMetadata)
                  }
                }
      

      so the problem might be related to not cleaning up the data structure with pending commits properly.

      Attachments

        Activity

          People

            dajac David Jacot
            mlowicki mlowicki
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: