Description
The transaction manager does currently not handle producer fenced errors returned from a offset commit request.
We found this bug because we encountered the following exception in our soak cluster:
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task [0_0]] at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256) at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050) at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [2020-10-22T04:09:54+02:00] (streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: org.apache.kafka.common.KafkaException: Unexpected error in TxnOffsetCommitResponse: There is a newer producer with the same transactionalId which fences the current one. at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748)