Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1159111) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -23,9 +23,11 @@ import kafka.api.ProducerRequest import kafka.serializer.Encoder import java.util.Properties -import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} import kafka.producer.{ProducerConfig, SyncProducerConfigShared, SyncProducerConfig, SyncProducer} +import kafka.utils.Utils +import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} + private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, val cbkHandler: CallbackHandler[T]) extends EventHandler[T] { @@ -100,8 +102,9 @@ val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic)) remainingEvents = topicEvents._2 distinctPartitions.foreach { p => - val topicPartitionEvents = topicEvents._1 partition (e => (e.getPartition == p)) - collatedEvents += ( (topic, p) -> topicPartitionEvents._1.map(q => q.getData).toSeq) + val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1 + if(topicPartitionEvents.size > 0) + collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData)) } } collatedEvents Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1159111) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -109,7 +109,8 @@ def tryToHandle(events: Seq[QueueItem[T]]) { try { if(logger.isDebugEnabled) logger.debug("Handling " + events.size + " events") - handler.handle(events, underlyingProducer, serializer) + if(events.size > 0) + handler.handle(events, underlyingProducer, serializer) }catch { case e: Exception => logger.error("Error in handling batch of " + events.size + " events", e) }