Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.8.2.0
-
None
-
None
Description
The problem is that the producer doesn't drain the unsent data properly on close. The problem is in the following code in Sender.run(). It's possible for this loop to exit with unfinished requests.
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
int unsent = 0;
do {
try
catch (Exception e)
{ log.error("Uncaught error in kafka producer I/O thread: ", e); }} while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0);
Suppose that all produce requests are being sent, but the sender is waiting for responses. Then the broker failed. In handling disconnects, we cleared all inflight requests. When we check the condition in the while clause, there is no unsent data and no in flight requests. However, failed records have been added to RecordAccumulator and are ready to be sent in the next iteration.