Description
KafkaConsumer.position may hang forever when deleting a topic. The problem is this line https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
The timeout is "Long.MAX_VALUE", and it will just retry forever for UnknownTopicOrPartitionException.
Here is a reproducer
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; import java.util.Set; public class KafkaReproducer { public static void main(String[] args) { // Make sure "delete.topic.enable" is set to true. // Please create the topic test with "3" partitions manually. // The issue is gone when there is only one partition. String topic = "test"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "testgroup"); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("enable.auto.commit", "false"); KafkaConsumer kc = new KafkaConsumer(props); kc.subscribe(Collections.singletonList(topic)); kc.poll(0); Set<TopicPartition> partitions = kc.assignment(); System.out.println("partitions: " + partitions); kc.pause(partitions); kc.seekToEnd(partitions); System.out.println("please delete the topic in 30 seconds"); try { // Sleep 30 seconds to give us enough time to delete the topic. Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("sleep end"); for (TopicPartition p : partitions) { System.out.println(p + " offset: " + kc.position(p)); } System.out.println("cannot reach here"); kc.close(); } }
Attachments
Issue Links
- blocks
-
SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0
- Resolved
- incorporates
-
KAFKA-6608 Add TimeoutException to KafkaConsumer#position()
- Resolved
- relates to
-
KAFKA-6127 Streams should never block infinitely
- Resolved
- links to