Index: core/src/main/scala/kafka/message/CompressionCodec.scala =================================================================== --- core/src/main/scala/kafka/message/CompressionCodec.scala (revision 1197994) +++ core/src/main/scala/kafka/message/CompressionCodec.scala (working copy) @@ -22,6 +22,7 @@ codec match { case 0 => NoCompressionCodec case 1 => GZIPCompressionCodec + case 2 => SnappyCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } } @@ -33,4 +34,6 @@ case object GZIPCompressionCodec extends CompressionCodec { val codec = 1 } +case object SnappyCompressionCodec extends CompressionCodec { val codec = 2 } + case object NoCompressionCodec extends CompressionCodec { val codec = 0 } Index: core/src/main/scala/kafka/message/CompressionUtils.scala =================================================================== --- core/src/main/scala/kafka/message/CompressionUtils.scala (revision 1197994) +++ core/src/main/scala/kafka/message/CompressionUtils.scala (working copy) @@ -83,6 +83,35 @@ val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec) oneCompressedMessage + case SnappyCompressionCodec => + import org.xerial.snappy.{SnappyOutputStream} + val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream() + val snappyOutput:SnappyOutputStream = new SnappyOutputStream(outputStream) + if(logger.isDebugEnabled) + logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages)) + + val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) + messages.foreach(m => m.serializeTo(messageByteBuffer)) + messageByteBuffer.rewind + + try { + snappyOutput.write(messageByteBuffer.array) + } catch { + case e: IOException => logger.error("Error while writing to the Snappy output stream", e) + if(snappyOutput != null) + snappyOutput.close() + if(outputStream != null) + outputStream.close() + throw e + } finally { + if(snappyOutput != null) + snappyOutput.close() + if(outputStream != null) + outputStream.close() + } + + val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec) + oneCompressedMessage case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } @@ -138,6 +167,32 @@ outputBuffer.rewind val outputByteArray = outputStream.toByteArray new ByteBufferMessageSet(outputBuffer) + case SnappyCompressionCodec => + import org.xerial.snappy.{SnappyInputStream} + val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream + val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload) + val snappyIn:SnappyInputStream = new SnappyInputStream(inputStream) + val intermediateBuffer = new Array[Byte](1024) + + try { + Stream.continually(snappyIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead => + outputStream.write(intermediateBuffer, 0, dataRead) + } + }catch { + case e: IOException => logger.error("Error while reading from the Snappy input stream", e) + if(snappyIn != null) snappyIn.close + if(outputStream != null) outputStream.close + throw e + } finally { + if(snappyIn != null) snappyIn.close + if(outputStream != null) outputStream.close + } + + val outputBuffer = ByteBuffer.allocate(outputStream.size) + outputBuffer.put(outputStream.toByteArray) + outputBuffer.rewind + val outputByteArray = outputStream.toByteArray + new ByteBufferMessageSet(outputBuffer) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + message.compressionCodec) } Index: project/build/KafkaProject.scala =================================================================== --- project/build/KafkaProject.scala (revision 1197994) +++ project/build/KafkaProject.scala (working copy) @@ -42,7 +42,7 @@ class CoreKafkaProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject with CoreDependencies with TestDependencies { + with IdeaProject with CoreDependencies with TestDependencies with CompressionDependencies { val corePackageAction = packageAllAction //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required @@ -226,4 +226,7 @@ val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2" } + trait CompressionDependencies { + val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1" + } }