Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7519

Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.0.0
    • Fix Version/s: 2.0.1, 2.1.0
    • Component/s: core, producer
    • Labels:
      None

      Description

       

      After digging into a case where an exactly-once streams process was bizarrely unable to process incoming data, we observed the following:

      • StreamThreads stalling while creating a producer, eventually resulting in no consumption by that streams process. Looking into those threads, we found they were stuck in a loop, sending InitProducerIdRequests and always receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests always had the same transactional id.
      • After changing the streams process to not use exactly-once, it was able to process messages with no problems.
      • Alternatively, changing the applicationId for that streams process, it was able to process with no problems.
      • Every hour,  every broker would fail the task `transactionalId-expiration` with the following error:
        • {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing transaction state transition to Dead while it already a pending sta
          te Dead
              at kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
              at kafka.coordinator
          .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
              at kafka.coordinator.transaction.TransactionStateManager$$a
          nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
          a:151)
              at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
          nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
              at
           kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
              at kafka.coordinator.transaction.TransactionSt
          ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
          ala:150)
              at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
          nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
          Like.scala:234)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
              at scala.collection.immutable.Li
          st.foreach(List.scala:392)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
              at scala.collection.immutable.Li
          st.map(List.scala:296)
              at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
          ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
              at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
          eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
              at scala.collection.Traversabl
          eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
              at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
          scala:241)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
              at scala.collection.mutable.HashMap$$anon
          fun$foreach$1.apply(HashMap.scala:130)
              at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
              at scala.collec
          tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
              at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
              at scala.collecti
          on.TraversableLike$class.flatMap(TraversableLike.scala:241)
              at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
              a
          t kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
          ansactionStateManager.scala:142)
              at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
          nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
              at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
          TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
              at kafka.utils.CoreUtils$.inLock(CoreUtils
          .scala:251)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
              at kafka.coordinator.transaction.TransactionStateManager$$anon
          fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
              at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc
          V$sp(KafkaScheduler.scala:114)
              at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
              at java.util.concurrent.Executors$RunnableAd
          apter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
              at java.util.concurrent.Scheduled
          ThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
              at java.util.concurrent.ScheduledThreadPoolExec
          utor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
          tor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.jav
          a:748)","exception_class":"java.lang.IllegalStateException","exception_message":"Preparing transaction state transition to Dead while it a
          lready a pending state Dead"},"source_host":"kafka-broker-4.kafka-broker.default.svc.cluster.local","method":"error","level":"ERROR","message":"Uncaught exception in scheduled task transactionalId-expiration","mdc":{},"file":"Logging.scala","line_number":"76","thread_name":"transaction-log-manager-0","logger_name":"kafka.utils.KafkaScheduler","class<span class="code-quote">":"kafka.utils.Logging$class"}

      Based on these problems and having read a bit of the server source, I guessed that this would all be explained by there being TransactionMetadata instances that are stuck in a pendingState.

      After doing a heap dump of the broker that was returning the error for our particular group, we found this:

      There were indeed a bunch of live TransactionMetadata instances that had a pending state of  "Dead" but should have already been cleaned up, confirming my guess.

      Finally, after reading carefully through the TransactionStateManager callback for producing tombstones for expired transactional ids I noticed that if there is any error returned by the ReplicaManager, those transactions will not have their pending state cleared.

      Short summary:

      If the ReplicaManager fails to append the tombstone records for expiring a transactional id (in my case, this likely happened during a rebalance that wasn't properly rate limited), the broker fails to clear it's pending state for that transactional id, blocking any future actions on that transactional id (including cleanup), until the broker is restarted or another broker without that problem becomes the coordinator for that transactional id.


      Related:
      There was a very similar case in KAFKA-5351 where not clearing a TransactionMetadata's pendingState caused similar issues.

        Attachments

        1. image-2018-10-18-13-02-22-371.png
          207 kB
          Bridger Howell
        2. KAFKA-7519.patch
          6 kB
          Bridger Howell

          Issue Links

            Activity

              People

              • Assignee:
                howellbridger Bridger Howell
                Reporter:
                howellbridger Bridger Howell
              • Votes:
                0 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: