Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1175616) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -83,8 +83,8 @@ case _ => if(config.compressedTopics.contains(topicAndEvents._1._1)) { if(logger.isTraceEnabled) - logger.trace("Sending %d messages with compression %d to topic %s on partition %d" - .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, config.compressionCodec.codec)) + logger.trace("Sending %d messages with compression codec %d to topic %s on partition %d" + .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2)) new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) } else { Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1175616) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -69,7 +69,7 @@ var full: Boolean = false // drain the queue until you get a shutdown command - Stream.continually(queue.poll(scala.math.max(0, queueTime - (lastSend - SystemTime.milliseconds)), TimeUnit.MILLISECONDS)) + Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)) .takeWhile(item => if(item != null) item.getData != shutdownCommand else true).foreach { currentQueueItem => val elapsed = (SystemTime.milliseconds - lastSend) @@ -129,4 +129,4 @@ logger.trace(event.getData.toString) } } -} \ No newline at end of file +}