From c97ccefa68c72901b41e0d651af8769bdd607b1a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 25 Mar 2015 13:25:35 -0700 Subject: [PATCH] KAFKA-527.v2 --- .../scala/kafka/consumer/ConsumerIterator.scala | 2 +- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- .../scala/kafka/message/ByteBufferMessageSet.scala | 63 ++++++++++++++-------- .../main/scala/kafka/tools/DumpLogSegments.scala | 2 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 10 ++-- .../unit/kafka/producer/SyncProducerTest.scala | 2 +- 6 files changed, 51 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 78fbf75..b00a4dc 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -37,7 +37,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk val clientId: String) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { - private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) + private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index ac96434..0256764 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -182,7 +182,7 @@ class LogSegment(val log: FileMessageSet, case NoCompressionCodec => entry.offset case _ => - ByteBufferMessageSet.decompress(entry.message).head.offset + ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 9c69471..89ab807 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -17,12 +17,13 @@ package kafka.message -import kafka.utils.Logging +import kafka.utils.{IteratorTemplate, Logging} +import kafka.common.KafkaException + import java.nio.ByteBuffer import java.nio.channels._ -import java.io.{InputStream, ByteArrayOutputStream, DataOutputStream} +import java.io._ import java.util.concurrent.atomic.AtomicLong -import kafka.utils.IteratorTemplate object ByteBufferMessageSet { @@ -58,24 +59,44 @@ object ByteBufferMessageSet { } } - def decompress(message: Message): ByteBufferMessageSet = { - val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream - val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload) - val intermediateBuffer = new Array[Byte](1024) - val compressed = CompressionFactory(message.compressionCodec, inputStream) - try { - Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead => - outputStream.write(intermediateBuffer, 0, dataRead) + /** Deep iterator that decompress the message sets in-place in the compressed message. */ + def deepIterator(wrapperMessage: Message): Iterator[MessageAndOffset] = { + new IteratorTemplate[MessageAndOffset] { + + val inputStream: InputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) + val compressed: DataInputStream = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream)) + + override def makeNext(): MessageAndOffset = { + try { + // read the offset + val offset = compressed.readLong() + // read record size + val size = compressed.readInt() + + if (size < Message.MinHeaderSize) + throw new InvalidMessageException("Message found with corrupt size (" + size + ")") + + // read the record into an intermediate record buffer + // and hence has to do extra copy + val bufferArray = new Array[Byte](size) + compressed.readFully(bufferArray, 0, size) + val buffer = ByteBuffer.wrap(bufferArray) + + val newMessage = new Message(buffer) + + // the decompressed message should not be a wrapper message since we do not allow nested compression + new MessageAndOffset(newMessage, offset) + } catch { + case eofe: EOFException => + compressed.close() + allDone() + case ioe: IOException => + throw new KafkaException(ioe) + } } - } finally { - compressed.close() } - val outputBuffer = ByteBuffer.allocate(outputStream.size) - outputBuffer.put(outputStream.toByteArray) - outputBuffer.rewind - new ByteBufferMessageSet(outputBuffer) } - + private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) { buffer.putLong(offset) buffer.putInt(message.size) @@ -147,7 +168,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi var topIter = buffer.slice() var innerIter: Iterator[MessageAndOffset] = null - def innerDone():Boolean = (innerIter == null || !innerIter.hasNext) + def innerDone(): Boolean = (innerIter == null || !innerIter.hasNext) def makeNextOuter: MessageAndOffset = { // if there isn't at least an offset and size, we are done @@ -176,7 +197,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi innerIter = null new MessageAndOffset(newMessage, offset) case _ => - innerIter = ByteBufferMessageSet.decompress(newMessage).internalIterator() + innerIter = ByteBufferMessageSet.deepIterator(newMessage) if(!innerIter.hasNext) innerIter = null makeNext() @@ -191,7 +212,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi if(innerDone()) makeNextOuter else - innerIter.next + innerIter.next() } } diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index fe2cc11..b7a3630 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -180,7 +180,7 @@ object DumpLogSegments { case NoCompressionCodec => getSingleMessageIterator(messageAndOffset) case _ => - ByteBufferMessageSet.decompress(message).iterator + ByteBufferMessageSet.deepIterator(message) } } else getSingleMessageIterator(messageAndOffset) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 8cd5f2f..3c0599c 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -269,13 +269,13 @@ class LogTest extends JUnitSuite { log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message) + def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head.message) /* we should always get the first message in the compressed set when reading any offset in the set */ - assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) - assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset) - assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset) - assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset) + assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset) + assertEquals("Read at offset 1 should produce 0", 0, read(1).next().offset) + assertEquals("Read at offset 2 should produce 2", 2, read(2).next().offset) + assertEquals("Read at offset 3 should produce 2", 2, read(3).next().offset) } /** diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index b5208a5..812df59 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -31,7 +31,7 @@ import kafka.api.ProducerResponseStatus import kafka.common.{TopicAndPartition, ErrorMapping} class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { - private var messageBytes = new Array[Byte](2); + private val messageBytes = new Array[Byte](2) // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool. val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, false).head)) val zookeeperConnect = TestZKUtils.zookeeperConnect -- 1.7.12.4