Index: core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (revision 1304645) +++ core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (working copy) @@ -94,6 +94,10 @@ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) //make sure the last offset after iteration is correct assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit) + + //make sure shallow iterator is the same as deep iterator + TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator), + TestUtils.getMessageIterator(messageSet.iterator)) } // test for compressed regular messages @@ -104,6 +108,8 @@ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) //make sure the last offset after iteration is correct assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit) + + verifyShallowIterator(messageSet) } // test for mixed empty and non-empty messagesets uncompressed @@ -121,6 +127,10 @@ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure the last offset after iteration is correct assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) + + //make sure shallow iterator is the same as deep iterator + TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator), + TestUtils.getMessageIterator(mixedMessageSet.iterator)) } // test for mixed empty and non-empty messagesets compressed @@ -138,7 +148,15 @@ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure the last offset after iteration is correct assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) + + verifyShallowIterator(mixedMessageSet) } } + def verifyShallowIterator(messageSet: ByteBufferMessageSet) { + //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator + val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet + val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet + assertTrue(shallowOffsets.subsetOf(deepOffsets)) + } } Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1304645) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -78,9 +78,12 @@ buffer.reset() written } - + + /** default iterator that iterates over decompressed messages */ override def iterator: Iterator[MessageAndOffset] = internalIterator() + /** iterator over compressed messages without decompressing */ + def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true) def verifyMessageSize(maxMessageSize: Int){ var shallowIter = internalIterator(true) @@ -124,6 +127,9 @@ message.limit(size) topIter.position(topIter.position + size) val newMessage = new Message(message) + if(!newMessage.isValid) + throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec + + " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset) if(isShallow){ currValidBytes += 4 + size @@ -133,16 +139,12 @@ else{ newMessage.compressionCodec match { case NoCompressionCodec => - if(!newMessage.isValid) - throw new InvalidMessageException("Uncompressed essage is invalid") debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes)) innerIter = null currValidBytes += 4 + size trace("currValidBytes = " + currValidBytes) new MessageAndOffset(newMessage, currValidBytes) case _ => - if(!newMessage.isValid) - throw new InvalidMessageException("Compressed message is invalid") debug("Message is compressed. Valid byte count = %d".format(currValidBytes)) innerIter = CompressionUtils.decompress(newMessage).internalIterator() if (!innerIter.hasNext) { Index: core/src/main/scala/kafka/consumer/KafkaMessageStream.scala =================================================================== --- core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (revision 1304645) +++ core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (working copy) @@ -27,11 +27,12 @@ class KafkaMessageStream[T](val topic: String, private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, - private val decoder: Decoder[T]) + private val decoder: Decoder[T], + val enableShallowIterator: Boolean) extends Iterable[T] with java.lang.Iterable[T]{ private val iter: ConsumerIterator[T] = - new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder) + new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder, enableShallowIterator) /** * Create an iterator over messages in the stream. Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1304645) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -31,7 +31,8 @@ class ConsumerIterator[T](private val topic: String, private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, - private val decoder: Decoder[T]) + private val decoder: Decoder[T], + val enableShallowIterator: Boolean) extends IteratorTemplate[T] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) @@ -74,7 +75,8 @@ .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset) } - localCurrent = currentDataChunk.messages.iterator + localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator + else currentDataChunk.messages.iterator current.set(localCurrent) } } Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1304645) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -105,5 +105,12 @@ val mirrorConsumerNumThreads = Utils.getInt( props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads) + + /** Use shallow iterator over compressed messages directly. This feature should be used very carefully. + * Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the + * overhead of decompression. + * */ + val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false) + } Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1304645) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -178,7 +178,7 @@ for (threadId <- threadIdSet) { val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) queues.put((topic, threadId), stream) - streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder) + streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder, config.enableShallowIterator) } ret += (topic -> streamList) debug("adding topic " + topic + " and stream to map..")