Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9279

Silent data loss in Kafka producer

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 3.2.0
    • producer
    • None

    Description

      It appears that it is possible for a producer.commitTransaction() call to succeed even if an individual producer.send() call has failed. The following code demonstrates the issue:

      package org.example.dataloss;
      
      import java.nio.charset.StandardCharsets;
      import java.util.Properties;
      import java.util.Random;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.common.serialization.ByteArraySerializer;
      
      public class Main {
      
          public static void main(final String[] args) {
              final Properties producerProps = new Properties();
      
              if (args.length != 2) {
                  System.err.println("Invalid command-line arguments");
                  System.exit(1);
              }
              final String bootstrapServer = args[0];
              final String topic = args[1];
      
              producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
              producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000");
              producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
              producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1000000");
              producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
              producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "dataloss_01");
              producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dataloss_01");
      
              try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer())) {
                  producer.initTransactions();
                  producer.beginTransaction();
      
                  final Random random = new Random();
                  final byte[] largePayload = new byte[2000000];
                  random.nextBytes(largePayload);
                  producer.send(
                      new ProducerRecord<>(
                          topic,
                          "large".getBytes(StandardCharsets.UTF_8),
                          largePayload
                      ),
                      (metadata, e) -> {
                          if (e == null) {
                              System.out.println("INFO: Large payload succeeded");
                          } else {
                              System.err.printf("ERROR: Large payload failed: %s\n", e.getMessage());
                          }
                      }
                  );
      
                  producer.commitTransaction();
                  System.out.println("Commit succeeded");
      
              } catch (final Exception e) {
                  System.err.printf("FATAL ERROR: %s", e.getMessage());
              }
          }
      }
      

      The code prints the following output:

      ERROR: Large payload failed: The message is 2000093 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
      Commit succeeded

       

      Attachments

        Issue Links

          Activity

            People

              ChrisEgerton Chris Egerton
              AndrewRK Andrew Klopper
              Votes:
              5 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: