Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1159148) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -43,22 +43,7 @@ private var deepValidByteCount = -1L def this(compressionCodec: CompressionCodec, messages: Message*) { - this( - compressionCodec match { - case NoCompressionCodec => - val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - for (message <- messages) { - message.serializeTo(buffer) - } - buffer.rewind - buffer - case _ => - val message = CompressionUtils.compress(messages, compressionCodec) - val buffer = ByteBuffer.allocate(message.serializedSize) - message.serializeTo(buffer) - buffer.rewind - buffer - }, 0L, ErrorMapping.NoError) + this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError) } Index: core/src/main/scala/kafka/message/MessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/MessageSet.scala (revision 1159148) +++ core/src/main/scala/kafka/message/MessageSet.scala (working copy) @@ -51,7 +51,30 @@ * The size of a size-delimited entry in a message set */ def entrySize(message: Message): Int = LogOverhead + message.size - + + def createByteBuffer(compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = + compressionCodec match { + case NoCompressionCodec => + val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) + for (message <- messages) { + message.serializeTo(buffer) + } + buffer.rewind + buffer + case _ => + messages.size match { + case 0 => + val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) + buffer.rewind + buffer + case _ => + val message = CompressionUtils.compress(messages, compressionCodec) + val buffer = ByteBuffer.allocate(message.serializedSize) + message.serializeTo(buffer) + buffer.rewind + buffer + } + } } Index: core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (revision 1159148) +++ core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (working copy) @@ -31,24 +31,7 @@ def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(compressionCodec match { - case NoCompressionCodec => - val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - val messageIterator = messages.iterator - while(messageIterator.hasNext) { - val message = messageIterator.next - message.serializeTo(buffer) - } - buffer.rewind - buffer - case _ => - import scala.collection.JavaConversions._ - val message = CompressionUtils.compress(asBuffer(messages), compressionCodec) - val buffer = ByteBuffer.allocate(message.serializedSize) - message.serializeTo(buffer) - buffer.rewind - buffer - }, 0L, ErrorMapping.NoError) + this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L, ErrorMapping.NoError) } def this(messages: java.util.List[Message]) {