Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15429

Kafka Streams attempts to commit on a closed producer when shutting down after an exception when running with EOS

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.6.0, 3.5.2
    • streams
    • None

    Description

      When a Stream Thread catches an exception from run, it tries to call unsubscribe from completeShutdown (https://github.com/apache/kafka/blob/1bb8c11f5aa07709ce1b1b6ef684a6750242d4b0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1200).

      unsubscribe results in the partition revoke callback being called, which tries to commit open transactions. This throws because the producer was already closed when completeShutdown called taskManager.shutdown, and the Stream Thread exits with an exception like:

      org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:969)
      at org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:753)
      at dev.responsive.kafka.clients.ResponsiveProducer.sendOffsetsToTransaction(ResponsiveProducer.java:104)
      at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:300)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:203)
      at org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:985)
      at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:95)
      at dev.responsive.kafka.clients.ResponsiveConsumer$RebalanceListener.lambda$onPartitionsRevoked$2(ResponsiveConsumer.java:76)
      at java.base/java.util.Optional.ifPresent(Unknown Source)
      at dev.responsive.kafka.clients.ResponsiveConsumer$RebalanceListener.onPartitionsRevoked(ResponsiveConsumer.java:76)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:347)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:907)
      at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1085)
      at dev.responsive.kafka.clients.ResponsiveConsumer.unsubscribe(ResponsiveConsumer.java:124)
      at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1200)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:586)

      Attachments

        Issue Links

          Activity

            People

              rohanpd Rohan Desai
              rohanpd Rohan Desai
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: