Description
When a Stream Thread catches an exception from run, it tries to call unsubscribe from completeShutdown (https://github.com/apache/kafka/blob/1bb8c11f5aa07709ce1b1b6ef684a6750242d4b0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1200).
unsubscribe results in the partition revoke callback being called, which tries to commit open transactions. This throws because the producer was already closed when completeShutdown called taskManager.shutdown, and the Stream Thread exits with an exception like:
org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:969)
at org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:753)
at dev.responsive.kafka.clients.ResponsiveProducer.sendOffsetsToTransaction(ResponsiveProducer.java:104)
at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:300)
at org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:203)
at org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:985)
at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:95)
at dev.responsive.kafka.clients.ResponsiveConsumer$RebalanceListener.lambda$onPartitionsRevoked$2(ResponsiveConsumer.java:76)
at java.base/java.util.Optional.ifPresent(Unknown Source)
at dev.responsive.kafka.clients.ResponsiveConsumer$RebalanceListener.onPartitionsRevoked(ResponsiveConsumer.java:76)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:347)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:907)
at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1085)
at dev.responsive.kafka.clients.ResponsiveConsumer.unsubscribe(ResponsiveConsumer.java:124)
at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1200)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:586)
Attachments
Issue Links
- links to