From 8ccb5b130cc32a3e8c03871ef3e453c343752261 Mon Sep 17 00:00:00 2001 From: Michael Tamm Date: Tue, 28 Aug 2012 12:43:53 +0200 Subject: [PATCH] Added constructors to Message which accept offset and size --- core/src/main/scala/kafka/message/Message.scala | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 272a0b6..43d485e 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -102,10 +102,10 @@ object Message { class Message(val buffer: ByteBuffer) { import kafka.message.Message._ - - - private def this(checksum: Long, bytes: Array[Byte], compressionCodec: CompressionCodec) = { - this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length)) + + + private def this(checksum: Long, bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = { + this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + size)) buffer.put(CurrentMagicValue) var attributes:Byte = 0 if (compressionCodec.codec > 0) { @@ -113,18 +113,22 @@ class Message(val buffer: ByteBuffer) { } buffer.put(attributes) Utils.putUnsignedInt(buffer, checksum) - buffer.put(bytes) + buffer.put(bytes, offset, size) buffer.rewind() } - def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, NoCompressionCodec) - - def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = { + def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, 0, bytes.length, NoCompressionCodec) + + def this(bytes: Array[Byte], offset: Int, size: Int, compressionCodec: CompressionCodec) = { //Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there - this(Utils.crc32(bytes), bytes, compressionCodec) + this(Utils.crc32(bytes, offset, size), bytes, offset, size, compressionCodec) } - def this(bytes: Array[Byte]) = this(bytes, NoCompressionCodec) + def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = this(bytes, 0, bytes.length, compressionCodec) + + def this(bytes: Array[Byte], offset: Int, size: Int) = this(bytes, offset, size, NoCompressionCodec) + + def this(bytes: Array[Byte]) = this(bytes, 0, bytes.length, NoCompressionCodec) def size: Int = buffer.limit -- 1.7.11.msysgit.1