Description
Hi everybody, hope everyone is doing great.
i) Introduction:
I noticed the following exception (on a cluster with brokers running 2.3.1) when trying to describe a consumer group (using the Kafka 2.7.1):
./kafka-consumer-groups.sh --describe --group order-validations
Error: Executing consumer group command failed due to null
java.lang.NullPointerException
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$6(ConsumerGroupCommand.scala:579)
at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
at scala.collection.convert.JavaCollectionWrappers$JSetWrapper.map(JavaCollectionWrappers.scala:180)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$5(ConsumerGroupCommand.scala:578)
at scala.collection.immutable.List.flatMap(List.scala:293)
at scala.collection.immutable.List.flatMap(List.scala:79)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:574)
at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
at scala.collection.mutable.Growable.addAll(Growable.scala:62)
at scala.collection.mutable.Growable.addAll$(Growable.scala:59)
at scala.collection.mutable.HashMap.addAll(HashMap.scala:117)
at scala.collection.mutable.HashMap$.from(HashMap.scala:570)
at scala.collection.mutable.HashMap$.from(HashMap.scala:563)
at scala.collection.MapOps$WithFilter.map(Map.scala:358)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:569)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:369)
at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:76)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
When trying on and older version of AdminClient (2.3.1):
Error: Executing consumer group command failed due to java.lang.IllegalArgumentException: Invalid negative offset java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Invalid negative offset at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getCommittedOffsets(ConsumerGroupCommand.scala:595) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:421) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$Lambda$131/000000004CB1EFD0.apply(Unknown Source) at scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:827) at scala.collection.TraversableLike$WithFilter$$Lambda$132/000000004CD49E20.apply(Unknown Source) at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at scala.collection.mutable.HashMap$$Lambda$133/000000004CD4A4F0.apply(Unknown Source) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:826) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:419) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:312) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) Caused by: java.lang.IllegalArgumentException: Invalid negative offset at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50) at org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160) at java.lang.Thread.run(Thread.java:820)
The main difference between those outputs is what had been done in KAFKA-9507.
ii) Problem:
commitedOffsets for some partitions are arriving as null to ConsumerGroupCommand. Then (for the assigned consumers to those such null OffsetAndMetadata's partitions) getting the offset's value throws an java.lang.NullPointerException, because the ConsumerGroupCommand tries to map over a null value.
iii) Example:
a) GroupID information (from describeConsumerGroups() method):
(groupId=order-validations, isSimpleConsumerGroup=false, members=(memberId=order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6, groupInstanceId=null, clientId=order-validations, host=/127.0.0.1, assignment=(topicPartitions=rtl_orderReceive-0,rtl_orderReceive-1,rtl_orderReceive-2,rtl_orderReceive-3,rtl_orderReceive-4,rtl_orderReceive-5,rtl_orderReceive-6,rtl_orderReceive-7,rtl_orderReceive-8,rtl_orderReceive-9)), partitionAssignor=RoundRobinAssigner, state=Stable, coordinator=f0527.cluster.cl:31047 (id: 1 rack: null), authorizedOperations=[])
b) Commited Offsets information (from getCommittedOffsets() method):
Map(rtl_orderReceive-0 -> null, rtl_orderReceive-1 -> OffsetAndMetadata{offset=39, leaderEpoch=null, metadata=''}, rtl_orderReceive-2 -> null, rtl_orderReceive-3 -> OffsetAndMetadata{offset=33, leaderEpoch=null, metadata=''}, rtl_orderReceive-4 -> null, rtl_orderReceive-5 -> null, rtl_orderReceive-7 -> null, rtl_orderReceive-8 -> null)
As seen, member order-validations-d5fbca62-ab2b-48d7-96ba-0ae72dff72a6 is assigned to all partitions, but the commited offsets reported for the the partition 0,2,4,5,7,8 are null.
Then getting commited offsets for rtl_orderReceive-0 throws an error at .map(.offset), because it translates to null.map(.offset). This is happening because the offset of that partions is -1 and that gets map to null (as defined on KAFKA-9507).
iv) Proposals:
a) Fix locally on the ConsumerGroupCommand:
Add a filter to the Commited Offsets arriving from upstreams to catch border cases. In that way, even if upstreams cames with null values instead of a OffsetAndMetadata, the describeGroups would work and get the consumer group description.
From
val committedOffsets = getCommittedOffsets(groupId)
To:
val committedOffsets = getCommittedOffsets(groupId).filter(_._2.isInstanceOf[OffsetAndMetadata])
b) Fix upstreams on KafkaAdmin's listConsumerGroupOffsets method:
Related to KAFKA-9507. In that issue, the solution to handle negative offsets was to explicitly set null to the topicPartition:
if (offset < 0) { groupOffsetsListing.put(topicPartition, null); } else { groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); }
That approach solves org.apache.kafka.clients.consumer.OffsetAndMetadata for throwing an 'Invalid negative offset' error, but affects downstreams methods that use KafkaAdminClient's listConsumerGroupOffsets method (as the one at kafka-consumer-groups.sh).
The proposal is to skip returning offset for topic partitions where offsets are negative:
if (offset >= 0) { groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); }
This would remove the negative offsets from the the listConsumerGroupOffsets and guarantee that the results are valids OffsetAndMetadata (not only handling negative offsets as KAFKA-9507, but not impacting other downstreams methods which expects an OffsetAndMetadata instead of a null value).
I think the second approach is cleaner because let the downstreams methods without having to handle the null's border case, which may lead to expecions (as seen).
I had been working on the both approaches, and I ready to prepare a PR. What do you think?
Attachments
Issue Links
- links to