Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1310145) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -73,18 +73,21 @@ // check if the queue time is reached. This happens when the poll method above returns after a timeout and // returns a null object val expired = currentQueueItem == null - if(currentQueueItem != null) { + if(currentQueueItem != null) trace("Dequeued item for topic %s and partition %d" .format(currentQueueItem.getTopic, currentQueueItem.getPartition)) - // handle the dequeued current item - if(cbkHandler != null) - events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem) - else + + // handle the dequeued current item + if(cbkHandler != null) + events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem) + else { + if (currentQueueItem != null) events += currentQueueItem + } - // check if the batch size is reached - full = events.size >= batchSize - } + // check if the batch size is reached + full = events.size >= batchSize + if(full || expired) { if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..") if(full) debug("Batch full. Sending..")