Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9233

Kafka consumer throws undocumented IllegalStateException

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 2.3.0
    • Fix Version/s: 2.5.0
    • Component/s: consumer
    • Labels:
      None

      Description

      If the provided collection of TopicPartition instances contains any duplicates, an IllegalStateException not documented in the javadoc is thrown by internal Java stream code when calling KafkaConsumer#beginningOffsets or KafkaConsumer#endOffsets.

      The stack trace looks like this,

      java.lang.IllegalStateException: Duplicate key -2
      	at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
      	at java.util.HashMap.merge(HashMap.java:1254)
      	at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
      	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
      	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
      	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
      	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
      	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
      	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
      	at org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
      	at org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
      
      java.lang.IllegalStateException: Duplicate key -1
      	at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
      	at java.util.HashMap.merge(HashMap.java:1254)
      	at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
      	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
      	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
      	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
      	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
      	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
      	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
      	at org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
      	at org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
      

      Looking at the code, it appears this may likely have been introduced by KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated TopicPartition values silently ignored. Either we should document this exception possibility (probably wrapping it with a Kafka exception class) indicating invalid client API usage, or restore the previous behavior where the duplicates were harmless.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                noslowerdna Andrew Olson
                Reporter:
                noslowerdna Andrew Olson
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: