Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1344473) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -70,10 +70,11 @@ trace("Dequeued item for topic %s, partition key: %s, data: %s" .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString)) events += currentQueueItem + } - // check if the batch size is reached - full = events.size >= batchSize - } + // check if the batch size is reached + full = events.size >= batchSize + if(full || expired) { if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..") if(full) debug("Batch full. Sending..") Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1344473) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -96,7 +96,7 @@ // TODO: need to handle ack's here! Will probably move to another method. kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition) val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition) - log.append(partitionData.messages) + log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) offsets(msgIndex) = log.nextAppendOffset errors(msgIndex) = ErrorMapping.NoError.toShort trace(partitionData.messages.sizeInBytes + " bytes written to logs.")