Description
https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about internal TX state transition and the producer is now throwing an IllegalStateException in situations it did swallow an internal error before.
This change surfaces a bug in Kafka Streams: Kafka Streams calls `abortTransaction()` blindly when a task is closed dirty, even if the Producer is already in an internal fatal state. However, if the Producer is in a fatal state, Kafka Streams should skip `abortTransaction` and only `close()` the Producer when closing a task dirty.
The bug is surfaced after `commitTransaction()` did timeout or after an `InvalidProducerEpochException` from a `send()` call, leading to the call to `abortTransaction()` – Kafka Streams does not track right now if a commit-TX is in progress.
java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` because the previous call to `commitTransaction` timed out and must be retried at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203) at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274)
and
[2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread [i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered sending r ecord to topic joined-counts for task 1_2 due to: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out (org.apache.kafka.streams.processor.internals.RecordCollectorImp l) org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. // followed by [2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | i-01aea6907970b1bf6-StreamThread-1-producer] [Producer clientId=i-01aea6907970b1bf6-StreamThread-1-producer, transactionalId=stream-soak-test -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: TransactionalId stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664) at org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702) at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612) at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$8(Sender.java:917) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:460) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:252) at java.lang.Thread.run(Thread.java:750)
If the Producer throws an IllegalStateException on `abortTransaction()` Kafka Streams treats this exception ("correctly") as fatal, and StreamsThread dies. However, Kafka Streams is actually in a state in which it can recover from, and thus should not let StreamThread die by carry forward (by not calling `abortTransaction()` and moving forward with the dirty close of the task).
It is unclear right now, how https://issues.apache.org/jira/browse/KAFKA-14567 is related – it has a similar stack trace, but it was reported before https://issues.apache.org/jira/browse/KAFKA-14831 was merged.
Attachments
Issue Links
- links to