Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6119

Silent Data Loss in Kafka011 Transactional Producer



    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    •, 1.0.0
    • core, producer
    • openjdk version "1.8.0_144"
      OpenJDK Runtime Environment (Zulu (build 1.8.0_144-b01)
      OpenJDK 64-Bit Server VM (Zulu (build 25.144-b01, mixed mode)


      Kafka can lose data published by a transactional KafkaProducer under some circumstances, i.e., data that should be committed atomically may not be fully visible from a consumer with read_committed isolation level.

      Steps to reproduce:

      1. Set transaction.timeout.ms to a low value such as 100
      2. Publish two messages in one transaction to different partitions of a topic with a sufficiently long time in-between the messages (e.g., 70 s).
      3. Only the second message is visible with read_committed isolation level.

      https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java for a full example. Detailed instructions can be found in the README.md: https://github.com/GJL/kafka011-transactional-producer-bug-demo

      Why is this possible?
      Because the transaction timeout is set to a low value, the transaction will be rolled back quickly after the first message is sent. Indeed, in the broker the following logs could be found:

      [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized transactionalId test-producer-1508964897483 with producerId 5 and producer epoch 0 on partition __transaction_state-10 (kafka.coordinator.transaction.TransactionCoordinator)
      [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed rollback ongoing transaction of transactionalId: test-producer-1508964897483 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)

      After rollback, the second message is sent to a different partition than the first message.
      Upon, transaction commit, org.apache.kafka.clients.producer.internals.TransactionManager may enqueue the request addPartitionsToTransactionHandler:

      private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
              if (!newPartitionsInTransaction.isEmpty())
              EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
                      producerIdAndEpoch.epoch, transactionResult);
              EndTxnHandler handler = new EndTxnHandler(builder);
              return handler.result;

      As can be seen, the condition is fulfilled if newPartitionsInTransaction is non-empty. I suspect because the second message goes to a different partition, this condition is satisfied.

      In KafkaApis.scala, I can see that handleAddPartitionToTxnRequest may eventually call TransactionMetadata#prepareAddPartitions:

       def prepareAddPartitions(addedTopicPartitions: immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = {
          val newTxnStartTimestamp = state match {
            case Empty | CompleteAbort | CompleteCommit => updateTimestamp
            case _ => txnStartTimestamp
          prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, (topicPartitions ++ addedTopicPartitions).toSet,
            newTxnStartTimestamp, updateTimestamp)

      Note that the method's first argument newState of is always Ongoing here. I suspect that this puts the transaction, which should be aborted, to Ongoing again.


        Issue Links



              apurva Apurva Mehta
              gyao Gary Y.
              0 Vote for this issue
              7 Start watching this issue