Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4879

KafkaConsumer.position may hang forever when deleting a topic

    Details

    • Type: Bug
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.10.2.0
    • Fix Version/s: 2.0.0
    • Component/s: consumer
    • Labels:
      None

      Description

      KafkaConsumer.position may hang forever when deleting a topic. The problem is this line https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
      The timeout is "Long.MAX_VALUE", and it will just retry forever for UnknownTopicOrPartitionException.

      Here is a reproducer

      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.TopicPartition;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import java.util.Collections;
      import java.util.Properties;
      import java.util.Set;
      
      public class KafkaReproducer {
      
        public static void main(String[] args) {
          // Make sure "delete.topic.enable" is set to true.
          // Please create the topic test with "3" partitions manually.
          // The issue is gone when there is only one partition.
          String topic = "test";
      
          Properties props = new Properties();
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "testgroup");
          props.put("value.deserializer", StringDeserializer.class.getName());
          props.put("key.deserializer", StringDeserializer.class.getName());
          props.put("enable.auto.commit", "false");
      
          KafkaConsumer kc = new KafkaConsumer(props);
      
          kc.subscribe(Collections.singletonList(topic));
      
          kc.poll(0);
          Set<TopicPartition> partitions = kc.assignment();
          System.out.println("partitions: " + partitions);
          kc.pause(partitions);
          kc.seekToEnd(partitions);
      
          System.out.println("please delete the topic in 30 seconds");
          try {
            // Sleep 30 seconds to give us enough time to delete the topic.
            Thread.sleep(30000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("sleep end");
          for (TopicPartition p : partitions) {
            System.out.println(p + " offset: " + kc.position(p));
          }
          System.out.println("cannot reach here");
          kc.close();
        }
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                hachikuji Jason Gustafson
                Reporter:
                zsxwing Shixiong Zhu
              • Votes:
                13 Vote for this issue
                Watchers:
                18 Start watching this issue

                Dates

                • Created:
                  Updated: