Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1159111) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -53,11 +53,20 @@ buffer.rewind buffer case _ => - val message = CompressionUtils.compress(messages, compressionCodec) - val buffer = ByteBuffer.allocate(message.serializedSize) - message.serializeTo(buffer) - buffer.rewind - buffer + messages.size match { + case 0 => + // KAFKA-109: If there are no messages to compress, create an empty byte buffer exactly like the one + // created by the empty uncompressed data case above + val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) + buffer.rewind + buffer + case _ => + val message = CompressionUtils.compress(messages, compressionCodec) + val buffer = ByteBuffer.allocate(message.serializedSize) + message.serializeTo(buffer) + buffer.rewind + buffer + } }, 0L, ErrorMapping.NoError) }