Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7531

NPE NullPointerException at TransactionCoordinator handleEndTransaction

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 2.0.0
    • 2.0.2
    • controller
    • None

    Description

      Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.

      Streams Application 4 instances, each has 5 Streams threads, total 20 stream threads.

      I observe NPE NullPointerException at coordinator broker which causes all application stream threads shutdown, here's stack from broker:

      [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe in group elo
      g_agg has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
      [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance group elog_agg with old generation 49 (__consumer_offsets-21) (kafka.coordinator.gro
      up.GroupCoordinator)
      
      [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group elog_agg generation 50 (__consumer_offsets-21) (kafka.coordinator.group.GroupCoordinator)
      [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from leader for group elog_agg for generation 50 (kafka.coordinator.group.GroupCoordina
      tor)
      [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on partition _
      _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
      [
      [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
      ue} (kafka.server.KafkaApis)
      java.lang.NullPointerException
       at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
       at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
       at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
       at scala.util.Either$RightProjection.flatMap(Either.scala:702)
       at kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
       at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
       at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
       at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
       at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
       at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
       at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
       at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
       at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
       at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
       at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
       at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
       at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
       at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
       at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
       at kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
       at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
       at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
       at java.lang.Thread.run(Thread.java:745)
      [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true} (kafka.server.KafkaApis)
      java.lang.NullPointerException
       at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
       at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
       at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
       at scala.util.Either$RightProjection.flatMap(Either.scala:702)
       at kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
       at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
       at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
       at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
       at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
       at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
       at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
       at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
       at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
       at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
       at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
       at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
       at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
       at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
       at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
       at kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
       at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
       at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
       at java.lang.Thread.run(Thread.java:745)
      [2018-10-22 21:52:27,531] INFO [GroupCoordinator 2]: Member elog_agg-client-sswvlp6804-StreamThread-4-consumer-ae1f00c2-7c2c-4f8e-bed4-20a955ecc122 in group elog_agg has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

       

       

      On the application side I can see such stack trace:

       

       

      2018-10-22 21:52:15 AssignedStreamsTasks [ERROR] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Failed to commit stream task 0_9 due to the following error:
      org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The server experienced an unexpected error when processing the request
      at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
      at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
      at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
      at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
      at java.lang.Thread.run(Thread.java:745)
      2018-10-22 21:52:15 StreamThread [INFO] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN
      2018-10-22 21:52:15 StreamThread [INFO] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Shutting down
      2018-10-22 21:52:15 KafkaProducer [INFO] [Producer clientId=elog_agg-client-sswvlp6802-StreamThread-4-0_17-producer, transactionalId=elog_agg-0_17] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      2018-10-22 21:52:16 AssignedStreamsTasks [ERROR] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Failed while closing StreamTask 0_9 due to the following error:
      org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
      at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
      at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
      at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679)
      at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:563)
      at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
      at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
      at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
      at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
      Caused by: org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The server experienced an unexpected error when processing the request
      at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
      at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
      

      This way all streams application threads are being shutdown.

       

       

      Attachments

        1. sswvlp6801.server.log.2018-12-01-11.gz
          425 kB
          Sebastian Puzoń
        2. server.log.2018-11-29-16.gz
          139 kB
          Sebastian Puzoń

        Activity

          People

            Unassigned Unassigned
            spuzon Sebastian Puzoń
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: