After a Kafka Stream with exactly once enabled has performed its first commit, the RecordAccumulator within the stream's internal producer gets stuck in a state where all subsequent ProducerBatches that get allocated are immediately flushed instead of being held in memory until they expire, regardless of the stream's linger or batch size config.
This is reproduced in the example code found at https://github.com/niclaslockner/kafka-12870 which can be run with ./gradlew run --args=<bootstrap servers>
The example has a producer that sends 1 record/sec to one topic, and a Kafka stream with EOS enabled that forwards the records from that topic to another topic with the configuration linger = 5 sec, commit interval = 10 sec.
The expected behavior when running the example is that the stream's ProducerBatches will expire (or get flushed because of the commit) every 5th second, and that the stream's producer will send a ProduceRequest every 5th second with an expired ProducerBatch that contains 5 records.
The actual behavior is that the ProducerBatch is made immediately available for the Sender, and the Sender sends one ProduceRequest for each record.
The example code contains a copy of the RecordAccumulator class (copied from kafka-clients 2.8.0) with some additional logging added to
- RecordAccumulator#ready(Cluster, long)
These log entries show (see the attached RecordsAccumulator.log)
- that the batches are considered sendable because a flush is in progress
- that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), and that this makes RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected 0-1.
This issue is not reproducible in version 2.3.1 or 2.4.1.