Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3450

Producer blocks on send to topic that doesn't exist if auto create is disabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 0.9.0.1
    • None
    • producer
    • None

    Description

      producer.send() is blocked for max.block.ms (default 60 seconds) if the destination topic doesn't exist and if their automatic creation is disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is logged every 100 ms in a loop until the 60 seconds timeout expires, but the operation is not recoverable.

      Preconditions

      • Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
      • Kafka 0.9.0.1 clients.

      Example minimalist code

      https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java

      /**
       * Test of sending to a topic that does not exist while automatic creation of topics is disabled in Kafka (auto.create.topics.enable=false).
       */
      public class NoSuchTopicTest {
          private static final Logger LOGGER = LoggerFactory.getLogger(NoSuchTopicTest.class);
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, NoSuchTopicTest.class.getSimpleName());
              properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); // Default is 60 seconds
      
              try (Producer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
                  LOGGER.info("Sending message");
                  producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", "key", "value"), (metadata, exception) -> {
                      if (exception != null) {
                          LOGGER.error("Send failed: {}", exception.toString());
                      } else {
                          LOGGER.info("Send successful: {}-{}/{}", metadata.topic(), metadata.partition(), metadata.offset());
                      }
                  });
      
                  LOGGER.info("Sending message");
                  producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", "key", "value"), (metadata, exception) -> {
                      if (exception != null) {
                          LOGGER.error("Send failed: {}", exception.toString());
                      } else {
                          LOGGER.info("Send successful: {}-{}/{}", metadata.topic(), metadata.partition(), metadata.offset());
                      }
                  });
              }
          }
      }
      

      Related output

      2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Sending message (NoSuchTopicTest.java:26)
      2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 0 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 1 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 2 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 3 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 4 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 5 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.433 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 6 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.534 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 7 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.635 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 8 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.736 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 9 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.772 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Send failed: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 35 ms. (NoSuchTopicTest.java:29)
      2016-03-23 12:44:38.773 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Sending message (NoSuchTopicTest.java:35)
      2016-03-23 12:44:38.837 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 10 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:38.938 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 11 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.039 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 12 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.140 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 13 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.242 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 14 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.345 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 15 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.447 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 16 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.549 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 17 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.651 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 18 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.752 WARN  o.a.kafka.clients.NetworkClient     [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching metadata with correlation id 19 : {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
      2016-03-23 12:44:39.774 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Send failed: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 21 ms. (NoSuchTopicTest.java:38)
      2016-03-23 12:44:39.774 INFO  o.a.k.c.producer.KafkaProducer      [main]: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (KafkaProducer.java:613)
      

      Known workaround

      • Configure max.block.ms = 0 in producer to prevent blocking and return from send() immediately. But be careful, I'm not sure if is it safe and can't cause something even worse

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              turek@avast.com Michal Turek
              Votes:
              12 Vote for this issue
              Watchers:
              29 Start watching this issue

              Dates

                Created:
                Updated: