Description
Under the case of idempotence is enabled, when a batch reaches its request.timeout.ms but not yet reaches delivery.timeout.ms, it will be retried and wait for another request.timeout.ms. During the time of this interval, the delivery.timeout.ms may be reached and Sender will remove this in flight batch and bump the producer epoch because of the unresolved sequence, then the sequence of this partition will be reset to 0.
At this time, if a new batch is sent to the same partition and the former batch reaches request.timeout.ms again, we will see an exception being thrown out by NetworkClient:
[ERROR] [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Uncaught error in request completion: java.lang.IllegalStateException: We are re-enqueueing a batch which is not tracked as part of the in flight requests. batch.topicPartition: txn_test_1648891362900-2; batch.baseSequence: 0 at org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102]
The cause of this is the inflightBatchesBySequence in TransactionManager is not being remove correctly. One batch may be removed by another batch with the same sequence number.
The potential consequence of this I can think out is that the send progress will be blocked until the latter batch being expired by delivery.timeout.ms