Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14020

Performance regression in Producer



    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 3.3.0
    • 3.3.0
    • producer
    • None


      https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a introduced a 10% performance regression in the KafkaProducer under a default config.


      The context for this result is a benchmark that we run for Kafka Streams. The benchmark provisions 5 independent AWS clusters, including one broker node on an i3.large and one client node on an i3.large. During a benchmark run, we first run the Producer for 10 minutes to generate test data, and then we run Kafka Streams under a number of configurations to measure its performance.

      Our observation was a 10% regression in throughput under the simplest configuration, in which Streams simply consumes from a topic and does nothing else. That benchmark actually runs faster than the producer that generates the test data, so its thoughput is bounded by the data generator's throughput. After investigation, we realized that the regression was in the data generator, not the consumer or Streams.

      We have numerous benchmark runs leading up to the commit in question, and they all show a throughput in the neighborhood of 115,000 records per second. We also have 40 runs including and after that commit, and they all show a throughput in the neighborhood of 105,000 records per second. A test on trunk with the commit reverted  shows a return to around 115,000 records per second.


      final Properties properties = new Properties();
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

      Here's the producer code in the data generator. Our tests were running with three produceThreads.

       for (int t = 0; t < produceThreads; t++) {
          futures.add(executorService.submit(() -> {
              int threadTotal = 0;
              long lastPrint = start;
              final long printInterval = Duration.ofSeconds(10).toMillis();
              long now;
              try (final org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(producerConfig(broker))) {
                  while (limit > (now = System.currentTimeMillis()) - start) {
                      for (int i = 0; i < 1000; i++) {
                          final String key = keys.next();
                          final String data = dataGen.generate();
                          producer.send(new ProducerRecord<>(topic, key, valueBuilder.apply(key, data)));
                      if ((now - lastPrint) > printInterval) {
                          System.out.println(Thread.currentThread().getName() + " produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + Duration.ofMillis(now - start));
                          lastPrint = now;
              System.out.println(Thread.currentThread().getName() + " finished (" + numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start));

      As you can see, this is a very basic usage.


        Issue Links



              alivshits Artem Livshits
              vvcephei John Roesler
              0 Vote for this issue
              4 Start watching this issue