Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8100

kafka consumer not refresh metadata for dynamic topic deletion

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.1.1, 2.1.1
    • Fix Version/s: None
    • Component/s: clients
    • Labels:
      None

      Description

      Recently we used flink to consume kafka topics with a regex pattern. It is found that when we deleted some unused topics, the logs will keep flushing UNKNOWN_TOPIC_EXCEPTION.

      I study the source code of kafka client, it is found that for consumer, topicExpiry is disable in Metadata, which leads to that the even the topic deleted, the client still have this topic info in the metadata's topic list and keep fetching from servers.

      Is there any good method to avoid this annoying warning logs without modify the kafka's source code? (We still need the 'Real' Unknown topic exception, which means not the outdated topic, in logs)

      The following code can be used to reproduce this problem (if you create multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster and then delete any of one while running).

      public static void main(String [] args) {
              Properties props = new Properties();
              props.put("bootstrap.servers", "localhost:9092\n");
              props.put("group.id", "test10");
              props.put("enable.auto.commit", "true");
              props.put("auto.commit.interval.ms", "1000");
              props.put("auto.offset.reset", "earliest");
              props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.put("metadata.max.age.ms", "60000");
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
              class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {
      
                  private KafkaConsumer<String, String> consumer;
      
                  public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
                      this.consumer = kafkaConsumer;
                  }
      
                  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      
                  }
      
                  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                      //reading all partitions from the beginning
                      consumer.seekToBeginning(partitions);
                  }
              }
      
              consumer.subscribe(Pattern.compile("^test.*$"), new PartitionOffsetAssignerListener(consumer));
      
              while (true) {
                  ConsumerRecords<String, String> records = consumer.poll(100);
                  for (ConsumerRecord<String, String> record : records) {
                      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                  }
              }
      }
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              ysn2233 Shengnan YU
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: