Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7000

KafkaConsumer.position should wait for assignment metadata

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 2.0.0
    • clients
    • None

    Description

      While updating Kafka Streams to stop using the deprecated Consumer.poll(long), I found that this code unexpectedly throws an exception:

      consumer.subscribe(topics);
      // consumer.poll(0); <- I've removed this line, which shouldn't be necessary here.
      
      final Set<TopicPartition> partitions = new HashSet<>();
      for (final String topic : topics) {
          for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
              partitions.add(new TopicPartition(partition.topic(), partition.partition()));
          }
      }
      
      for (final TopicPartition tp : partitions) {
          final long offset = consumer.position(tp);
          committedOffsets.put(tp, offset);
      }

      Here is the exception:

      Exception in thread "main" java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer.
         at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620)
         at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586)
         at org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275)
         at org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148)
         at org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69)

       

      As you can see in the commented code in my snippet, we used to block for assignment with a poll(0), which is now deprecated.

      It seems reasonable to me for position() to do the same thing that poll() does, which is call `coordinator.poll(timeout.toMillis())` early in processing to ensure an up-to-date assignment.

      Attachments

        Issue Links

          Activity

            People

              lucasbru Lucas Brutschy
              vvcephei John Roesler
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: