Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12926

ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

Agile BoardAttach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.0.0
    • admin, clients
    • None

    Description

      Hi everybody, hope everyone is doing great.

      i) Introduction:
      I noticed the following exception (on a cluster with brokers running 2.3.1) when trying to describe a consumer group (using the Kafka 2.7.1):

       

      ./kafka-consumer-groups.sh --describe --group order-validations
      Error: Executing consumer group command failed due to null
      java.lang.NullPointerException
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$6(ConsumerGroupCommand.scala:579)
       at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
       at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
       at scala.collection.convert.JavaCollectionWrappers$JSetWrapper.map(JavaCollectionWrappers.scala:180)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$5(ConsumerGroupCommand.scala:578)
       at scala.collection.immutable.List.flatMap(List.scala:293)
       at scala.collection.immutable.List.flatMap(List.scala:79)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:574)
       at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
       at scala.collection.mutable.Growable.addAll(Growable.scala:62)
       at scala.collection.mutable.Growable.addAll$(Growable.scala:59)
       at scala.collection.mutable.HashMap.addAll(HashMap.scala:117)
       at scala.collection.mutable.HashMap$.from(HashMap.scala:570)
       at scala.collection.mutable.HashMap$.from(HashMap.scala:563)
       at scala.collection.MapOps$WithFilter.map(Map.scala:358)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:569)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:369)
       at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:76)
       at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
       at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)

       

      When trying on and older version of AdminClient (2.3.1):

      Error: Executing consumer group command failed due to java.lang.IllegalArgumentException: Invalid negative offset
      java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Invalid negative offset
       at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
       at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
       at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
       at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getCommittedOffsets(ConsumerGroupCommand.scala:595)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:421)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$Lambda$131/000000004CB1EFD0.apply(Unknown Source)
       at scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:827)
       at scala.collection.TraversableLike$WithFilter$$Lambda$132/000000004CD49E20.apply(Unknown Source)
       at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
       at scala.collection.mutable.HashMap$$Lambda$133/000000004CD4A4F0.apply(Unknown Source)
       at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
       at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
       at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
       at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:826)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:419)
       at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:312)
       at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
       at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
      Caused by: java.lang.IllegalArgumentException: Invalid negative offset
       at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50)
       at org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832)
       at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032)
       at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160)
       at java.lang.Thread.run(Thread.java:820)

       

      The main difference between those outputs is what had been done in KAFKA-9507.

      ii) Problem:
      commitedOffsets for some partitions are arriving as null to ConsumerGroupCommand. Then (for the assigned consumers to those such null OffsetAndMetadata's partitions) getting the offset's value throws an java.lang.NullPointerException, because the ConsumerGroupCommand tries to map over a null value.

      iii) Example:
      a) GroupID information (from describeConsumerGroups() method):
      (groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])

      b) Commited Offsets information (from getCommittedOffsets() method):
      Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)

      As seen, member order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6 is assigned to all partitions, but the commited offsets reported for the the partition 0,2,4,5,7,8 are null.
      Then getting commited offsets for rtl_orderReceive-0 throws an error at .map(.offset), because it translates to null.map(.offset). This is happening because the offset of that partions is -1 and that gets map to null (as defined on KAFKA-9507).

      iv) Proposals:

      a) Fix locally on the ConsumerGroupCommand:
      Add a filter to the Commited Offsets arriving from upstreams to catch border cases. In that way, even if upstreams cames with null values instead of a OffsetAndMetadata, the describeGroups would work and get the consumer group description.

      From

      val committedOffsets = getCommittedOffsets(groupId)

      To:

      val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])

       

      b) Fix upstreams on KafkaAdmin's listConsumerGroupOffsets method:
      Related to KAFKA-9507. In that issue, the solution to handle negative offsets was to explicitly set null to the topicPartition:

      if (offset < 0) {
       groupOffsetsListing.put(topicPartition, null);
       } else {
       groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
       }

       

      That approach solves org.apache.kafka.clients.consumer.OffsetAndMetadata for throwing an 'Invalid negative offset' error, but affects downstreams methods that use KafkaAdminClient's listConsumerGroupOffsets method (as the one at kafka-consumer-groups.sh).

      The proposal is to skip returning offset for topic partitions where offsets are negative:

      if (offset >= 0) {
       groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
       }

       

      This would remove the negative offsets from the the listConsumerGroupOffsets and guarantee that the results are valids OffsetAndMetadata (not only handling negative offsets as KAFKA-9507, but not impacting other downstreams methods which expects an OffsetAndMetadata instead of a null value).

      I think the second approach is cleaner because let the downstreams methods without having to handle the null's border case, which may lead to expecions (as seen).

      I had been working on the both approaches, and I ready to prepare a PR. What do you think?

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            IgnacioAcunaFri Ignacio Acuna
            IgnacioAcunaFri Ignacio Acuna
            David Jacot David Jacot
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 24h
              24h
              Remaining:
              Remaining Estimate - 24h
              24h
              Logged:
              Time Spent - Not Specified
              Not Specified

              Slack

                Issue deployment