Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4879

KafkaConsumer.position may hang forever when deleting a topic

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.2.0
    • 2.0.0
    • consumer
    • None

    Description

      KafkaConsumer.position may hang forever when deleting a topic. The problem is this line https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
      The timeout is "Long.MAX_VALUE", and it will just retry forever for UnknownTopicOrPartitionException.

      Here is a reproducer

      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.TopicPartition;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import java.util.Collections;
      import java.util.Properties;
      import java.util.Set;
      
      public class KafkaReproducer {
      
        public static void main(String[] args) {
          // Make sure "delete.topic.enable" is set to true.
          // Please create the topic test with "3" partitions manually.
          // The issue is gone when there is only one partition.
          String topic = "test";
      
          Properties props = new Properties();
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "testgroup");
          props.put("value.deserializer", StringDeserializer.class.getName());
          props.put("key.deserializer", StringDeserializer.class.getName());
          props.put("enable.auto.commit", "false");
      
          KafkaConsumer kc = new KafkaConsumer(props);
      
          kc.subscribe(Collections.singletonList(topic));
      
          kc.poll(0);
          Set<TopicPartition> partitions = kc.assignment();
          System.out.println("partitions: " + partitions);
          kc.pause(partitions);
          kc.seekToEnd(partitions);
      
          System.out.println("please delete the topic in 30 seconds");
          try {
            // Sleep 30 seconds to give us enough time to delete the topic.
            Thread.sleep(30000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("sleep end");
          for (TopicPartition p : partitions) {
            System.out.println(p + " offset: " + kc.position(p));
          }
          System.out.println("cannot reach here");
          kc.close();
        }
      }
      

      Attachments

        Issue Links

          Activity

            People

              hachikuji Jason Gustafson
              zsxwing Shixiong Zhu
              Votes:
              13 Vote for this issue
              Watchers:
              18 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: