Description
It appears that it is possible for a producer.commitTransaction() call to succeed even if an individual producer.send() call has failed. The following code demonstrates the issue:
package org.example.dataloss; import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; public class Main { public static void main(final String[] args) { final Properties producerProps = new Properties(); if (args.length != 2) { System.err.println("Invalid command-line arguments"); System.exit(1); } final String bootstrapServer = args[0]; final String topic = args[1]; producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000"); producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000"); producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1000000"); producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "dataloss_01"); producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dataloss_01"); try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer())) { producer.initTransactions(); producer.beginTransaction(); final Random random = new Random(); final byte[] largePayload = new byte[2000000]; random.nextBytes(largePayload); producer.send( new ProducerRecord<>( topic, "large".getBytes(StandardCharsets.UTF_8), largePayload ), (metadata, e) -> { if (e == null) { System.out.println("INFO: Large payload succeeded"); } else { System.err.printf("ERROR: Large payload failed: %s\n", e.getMessage()); } } ); producer.commitTransaction(); System.out.println("Commit succeeded"); } catch (final Exception e) { System.err.printf("FATAL ERROR: %s", e.getMessage()); } } }
The code prints the following output:
ERROR: Large payload failed: The message is 2000093 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. Commit succeeded
Attachments
Issue Links
- causes
-
KAFKA-15259 Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
- Open
- is duplicated by
-
KAFKA-10334 Transactions not working properly
- Resolved
- links to