Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7000

KafkaConsumer.position should wait for assignment metadata

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.0.0
    • Component/s: clients
    • Labels:
      None

      Description

      While updating Kafka Streams to stop using the deprecated Consumer.poll(long), I found that this code unexpectedly throws an exception:

      consumer.subscribe(topics);
      // consumer.poll(0); <- I've removed this line, which shouldn't be necessary here.
      
      final Set<TopicPartition> partitions = new HashSet<>();
      for (final String topic : topics) {
          for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
              partitions.add(new TopicPartition(partition.topic(), partition.partition()));
          }
      }
      
      for (final TopicPartition tp : partitions) {
          final long offset = consumer.position(tp);
          committedOffsets.put(tp, offset);
      }

      Here is the exception:

      Exception in thread "main" java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer.
         at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620)
         at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586)
         at org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275)
         at org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148)
         at org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69)

       

      As you can see in the commented code in my snippet, we used to block for assignment with a poll(0), which is now deprecated.

      It seems reasonable to me for position() to do the same thing that poll() does, which is call `coordinator.poll(timeout.toMillis())` early in processing to ensure an up-to-date assignment.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                vvcephei John Roesler
                Reporter:
                vvcephei John Roesler
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: