Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
0.11.0.0, 0.11.0.1
-
openjdk version "1.8.0_144"
OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01)
OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode)
Description
Kafka can lose data published by a transactional KafkaProducer under some circumstances, i.e., data that should be committed atomically may not be fully visible from a consumer with read_committed isolation level.
Steps to reproduce:
- Set transaction.timeout.ms to a low value such as 100
- Publish two messages in one transaction to different partitions of a topic with a sufficiently long time in-between the messages (e.g., 70 s).
- Only the second message is visible with read_committed isolation level.
See
https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java for a full example. Detailed instructions can be found in the README.md: https://github.com/GJL/kafka011-transactional-producer-bug-demo
Why is this possible?
Because the transaction timeout is set to a low value, the transaction will be rolled back quickly after the first message is sent. Indeed, in the broker the following logs could be found:
[2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized transactionalId test-producer-1508964897483 with producerId 5 and producer epoch 0 on partition __transaction_state-10 (kafka.coordinator.transaction.TransactionCoordinator) [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed rollback ongoing transaction of transactionalId: test-producer-1508964897483 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
After rollback, the second message is sent to a different partition than the first message.
Upon, transaction commit, org.apache.kafka.clients.producer.internals.TransactionManager may enqueue the request addPartitionsToTransactionHandler:
private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) { if (!newPartitionsInTransaction.isEmpty()) enqueueRequest(addPartitionsToTransactionHandler()); EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, transactionResult); EndTxnHandler handler = new EndTxnHandler(builder); enqueueRequest(handler); return handler.result; }
As can be seen, the condition is fulfilled if newPartitionsInTransaction is non-empty. I suspect because the second message goes to a different partition, this condition is satisfied.
In KafkaApis.scala, I can see that handleAddPartitionToTxnRequest may eventually call TransactionMetadata#prepareAddPartitions:
def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = { val newTxnStartTimestamp = state match { case Empty | CompleteAbort | CompleteCommit => updateTimestamp case _ => txnStartTimestamp } prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, (topicPartitions ++ addedTopicPartitions).toSet, newTxnStartTimestamp, updateTimestamp) }
Note that the method's first argument newState of is always Ongoing here. I suspect that this puts the transaction, which should be aborted, to Ongoing again.
Attachments
Issue Links
- links to