Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4024

First metadata update always take retry.backoff.ms milliseconds to complete

Agile BoardAttach filesAttach ScreenshotVotersStop watchingWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.9.0.1, 0.10.0.0
    • 0.10.2.0
    • clients
    • None

    Description

      Recently I updated our KafkaProducer configuration, specifically we adjusted retry.backoff.ms from default(100ms) to 1000ms.
      After that we observed that the first send() start taking longer than before, investigated then found following facts.

      Environment:

      • Kafka broker 0.9.0.1
      • Kafka producer 0.9.0.1

      Our current version is 0.9.0.1 but it reproduced with latest build from trunk branch as well.

      TL;DR

      The first KafkaProducer.send() always blocked retry.backoff.ms milliseconds, due to unintentionally applied backoff on first metadata update.

      Proof

      I wrote following test code and placed under the clients/main/java/

      import java.util.Properties;
      import java.util.concurrent.TimeUnit;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.Producer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.serialization.ByteArraySerializer;
      
      public final class KafkaProducerMetadataUpdateDurationTest {
          public static void main(String[] args) {
              Properties props = new Properties();
              props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
              String retryBackoffMs = System.getProperty("retry.backoff.ms");
              System.err.println("Experimenting with retry.backoff.ms = " + retryBackoffMs);
              props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
      
              Producer<byte[], byte[]> producer =
                      new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
      
              long t0 = System.nanoTime();
              try {
                  producer.partitionsFor("test");
                  long duration = System.nanoTime() - t0;
                  System.err.println("Duration = " + TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
              } finally {
                  producer.close();
              }
          }
      }
      

      Here's experiment log:

      # Start zookeeper & kafka broker
      ./bin/zookeeper-server-start.sh config/zookeeper.properties
      ./bin/kafka-server-start.sh config/server.properties
      
      # Create test topic
      ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 1 --partitions 1
      
      $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 KafkaProducerMetadataUpdateDurationTest
      Experimenting with retry.backoff.ms = 100
      Duration = 175 ms
      
      $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 KafkaProducerMetadataUpdateDurationTest
      Experimenting with retry.backoff.ms = 1000
      Duration = 1066 ms
      
      $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000 KafkaProducerMetadataUpdateDurationTest
      Experimenting with retry.backoff.ms = 10000
      Duration = 10070 ms
      

      As you can see, duration of partitionsFor() increases linearly in proportion to the value of retry.backoff.ms.

      Here I describe the scenario that leads this behavior:
      1. KafkaProducer initializes metadata with giving bootstrap.servers and the current timestamp: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
      2. On the first send(), KafkaProducer requests metadata update due to missing partition info: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
      3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because metadata.timeToNextUpdate returns a value lager than zero: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
      4. Metadata.timeToNextUpdate returns lager one of time till metadata expiration or time till backing off expiration but practially needUpdate is always true at the first time so here the timeToAllowUpdate is always adopted, which never be zero until retry.backoff.ms elapsed since the first metadata.update(): https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116

      This is because of kafka client tries to keep interval configured by retry.backoff.ms between each metadata update so it's basically works fine from the second update but for the first time, since it could never have the actual metadata(which is obtained by MetadaUpdate request), this backing off isn't making sense and in fact it's harming our application by blocking the first send() insanely long.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kawamuray Yuto Kawamura
            kawamuray Yuto Kawamura
            Votes:
            0 Vote for this issue
            Watchers:
            4 Stop watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment