Affects Version/s: 0.11.0.0, 0.11.0.1
Environment:openjdk version "1.8.0_144"
OpenJDK Runtime Environment (Zulu 184.108.40.206-macosx) (build 1.8.0_144-b01)
OpenJDK 64-Bit Server VM (Zulu 220.127.116.11-macosx) (build 25.144-b01, mixed mode)
Kafka can lose data published by a transactional KafkaProducer under some circumstances, i.e., data that should be committed atomically may not be fully visible from a consumer with read_committed isolation level.
Steps to reproduce:
- Set transaction.timeout.ms to a low value such as 100
- Publish two messages in one transaction to different partitions of a topic with a sufficiently long time in-between the messages (e.g., 70 s).
- Only the second message is visible with read_committed isolation level.
https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java for a full example. Detailed instructions can be found in the README.md: https://github.com/GJL/kafka011-transactional-producer-bug-demo
Why is this possible?
Because the transaction timeout is set to a low value, the transaction will be rolled back quickly after the first message is sent. Indeed, in the broker the following logs could be found:
After rollback, the second message is sent to a different partition than the first message.
Upon, transaction commit, org.apache.kafka.clients.producer.internals.TransactionManager may enqueue the request addPartitionsToTransactionHandler:
As can be seen, the condition is fulfilled if newPartitionsInTransaction is non-empty. I suspect because the second message goes to a different partition, this condition is satisfied.
In KafkaApis.scala, I can see that handleAddPartitionToTxnRequest may eventually call TransactionMetadata#prepareAddPartitions:
Note that the method's first argument newState of is always Ongoing here. I suspect that this puts the transaction, which should be aborted, to Ongoing again.