Description
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious long delays between records being produced by a stream task and the same records being consumed by the next task. These delays turned out to always be around retry.backoff.ms long; reducing that value reduced the delays by about the same amount.
After digging further, I pinned down the problem to the following lines in org.apache.kafka.clients.producer.internals.Sender#runOnce:
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {
// as long as there are outstanding transactional requests, we simply wait for them to return}}
client.poll(retryBackoffMs, time.milliseconds());}}}}
return;
}
This code seems to assume that, if maybeSendTransactionalRequest returns true, a transactional request has been sent out that should be waited for. However, this is not true if the request requires a coordinator lookup:
if (nextRequestHandler.needsCoordinator()) {
{{ targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{ if (targetNode == null) {}}
{{ transactionManager.lookupCoordinator(nextRequestHandler); }}}}}}
{{ break;}}
{{ }}}
{{ ...}}
lookupCoordinator() does not actually send anything, but just enqueues a coordinator lookup request for the Sender's next run loop iteration. maybeSendTransactionalRequest still returns true, though (the break jumps to a return true at the end of the method), leading the Sender to needlessly wait via client.poll() although there is actually no request in-flight.
I think the fix is to let maybeSendTransactionalRequest return false if it merely enqueues the coordinator lookup instead of actually sending anything. But I'm not sure, hence the bug report instead of a pull request.
Attachments
Issue Links
- links to