From c142fad2c7acecec728c1d19943ba9c49f26b4ac Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Mon, 18 May 2015 09:31:47 -0700 Subject: [PATCH 1/6] Rebased --- core/src/main/scala/kafka/log/LogCleaner.scala | 142 ++++++++++++++++----- .../test/scala/kafka/tools/TestLogCleaning.scala | 36 ++++-- .../unit/kafka/log/LogCleanerIntegrationTest.scala | 62 ++++++--- 3 files changed, 173 insertions(+), 67 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index abea8b2..89175c4 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -21,7 +21,6 @@ import kafka.common._ import kafka.message._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup - import scala.collection._ import scala.math import java.nio._ @@ -30,8 +29,10 @@ import java.io.File import java.lang.IllegalStateException import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit - import com.yammer.metrics.core.Gauge +import scala.collection.mutable.ListBuffer +import java.io.ByteArrayOutputStream +import java.io.DataOutputStream /** * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. @@ -389,7 +390,6 @@ private[log] class Cleaner(val id: Int, * @param map The key=>offset mapping * @param retainDeletes Should delete tombstones be retained while cleaning this segment * - * TODO: Implement proper compression support */ private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) { @@ -403,28 +403,13 @@ private[log] class Cleaner(val id: Int, throttler.maybeThrottle(messages.sizeInBytes) // check each message to see if it is to be retained var messagesRead = 0 - for (entry <- messages) { - messagesRead += 1 - val size = MessageSet.entrySize(entry.message) - position += size - stats.readMessage(size) - val key = entry.message.key - if (key != null) { - val foundOffset = map.get(key) - /* two cases in which we can get rid of a message: - * 1) if there exists a message with the same key but higher offset - * 2) if the message is a delete "tombstone" marker and enough time has passed - */ - val redundant = foundOffset >= 0 && entry.offset < foundOffset - val obsoleteDelete = !retainDeletes && entry.message.isNull - if (!redundant && !obsoleteDelete) { - ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) - stats.recopyMessage(size) - } - } else { - stats.invalidMessage() - } + for (entry <- messages.shallowIterator) { + if (entry.message.compressionCodec == NoCompressionCodec) + messagesRead += cleanNonCompressedMessages(source, map, retainDeletes, entry) + else + messagesRead += cleanCompressedMessages(source, map, retainDeletes, entry) } + position += messages.validBytes // if any messages are to be retained, write them out if(writeBuffer.position > 0) { writeBuffer.flip() @@ -439,7 +424,94 @@ private[log] class Cleaner(val id: Int, } restoreBuffers() } - + + /** + * clean the non-compressed message + */ + private def cleanNonCompressedMessages(source: kafka.log.LogSegment, + map: kafka.log.OffsetMap, + retainDeletes: Boolean, + entry: kafka.message.MessageAndOffset): Int = { + val size = MessageSet.entrySize(entry.message) + stats.readMessage(size) + if(retainMessage(source, map, retainDeletes, entry)) { + ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) + stats.recopyMessage(size) + } + 1 + } + + private def retainMessage(source: kafka.log.LogSegment, + map: kafka.log.OffsetMap, + retainDeletes: Boolean, + entry: kafka.message.MessageAndOffset): Boolean = { + val key = entry.message.key + if (key != null) { + val foundOffset = map.get(key) + /* two cases in which we can get rid of a message: + * 1) if there exists a message with the same key but higher offset + * 2) if the message is a delete "tombstone" marker and enough time has passed + */ + val redundant = foundOffset >= 0 && entry.offset < foundOffset + val obsoleteDelete = !retainDeletes && entry.message.isNull + !redundant && !obsoleteDelete + } else { + stats.invalidMessage() + false + } + } + + /** + * clean the compressed message + */ + private def cleanCompressedMessages(source: kafka.log.LogSegment, + map: kafka.log.OffsetMap, + retainDeletes: Boolean, + messageAndOffset: kafka.message.MessageAndOffset): Int = { + val size = MessageSet.entrySize(messageAndOffset.message) + stats.readMessage(size) + + val messages = ByteBufferMessageSet.deepIterator(messageAndOffset.message) + var messagesRead = 0 + var retainedMessages = ListBuffer[MessageAndOffset]() + for (entry <- messages) { + messagesRead += 1 + if(retainMessage(source, map, retainDeletes, entry)) { + retainedMessages += entry + } + } + + if(retainedMessages.size != 0) { + val message = compressMessages(messageAndOffset.message.compressionCodec, retainedMessages.toList) + ByteBufferMessageSet.writeMessage(writeBuffer, message.message, message.offset) + stats.recopyMessage(MessageSet.entrySize(message.message)) + } + messagesRead + } + + /** + * compresses the given the list of messages + */ + private def compressMessages(compressionCodec: CompressionCodec, messages: List[MessageAndOffset]): MessageAndOffset = { + val byteArrayStream = new ByteArrayOutputStream(MessageSet.messageSetSize(messages.map(m => m.message))) + val output = new DataOutputStream(CompressionFactory(compressionCodec, byteArrayStream)) + var offset = -1L + try { + for (messageAndOffset <- messages) { + offset = messageAndOffset.offset + val message = messageAndOffset.message + output.writeLong(offset) + output.writeInt(message.size) + output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) + } + } finally { + output.close() + } + val bytes = byteArrayStream.toByteArray + val message = new Message(bytes, compressionCodec) + new MessageAndOffset(message, offset) +} + /** * Double the I/O buffer capacity */ @@ -542,13 +614,14 @@ private[log] class Cleaner(val id: Int, val startPosition = position for (entry <- messages) { val message = entry.message - val size = MessageSet.entrySize(message) - position += size if (message.hasKey) map.put(message.key, entry.offset) offset = entry.offset - stats.indexMessage(size) + stats.indexMessagesRead(1) } + position += messages.validBytes + stats.indexMessageBytesRead(messages.validBytes) + // if we didn't read even one complete message, our read buffer may be too small if(position == startPosition) growBuffers() @@ -580,16 +653,19 @@ private case class CleanerStats(time: Time = SystemTime) { messagesWritten += 1 bytesWritten += size } - - def indexMessage(size: Int) { - mapMessagesRead += 1 + + def indexMessagesRead(size: Int) { + mapMessagesRead += size + } + + def indexMessageBytesRead(size: Int) { mapBytesRead += size } - + def indexDone() { mapCompleteTime = time.milliseconds } - + def allDone() { endTime = time.milliseconds } diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala index 8445894..dcbfbe1 100755 --- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala @@ -52,6 +52,11 @@ object TestLogCleaning { .describedAs("count") .ofType(classOf[java.lang.Long]) .defaultsTo(Long.MaxValue) + val messageCompressionOpt = parser.accepts("compression-type", "message compression type") + .withOptionalArg() + .describedAs("compressionType") + .ofType(classOf[java.lang.String]) + .defaultsTo("none") val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.") .withRequiredArg .describedAs("count") @@ -84,38 +89,39 @@ object TestLogCleaning { .withRequiredArg .describedAs("directory") .ofType(classOf[String]) - + val options = parser.parse(args:_*) - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.") - + if(options.has(dumpOpt)) { dumpLog(new File(options.valueOf(dumpOpt))) System.exit(0) } - + CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt) - + // parse options val messages = options.valueOf(numMessagesOpt).longValue + val compressionType = options.valueOf(messageCompressionOpt) val percentDeletes = options.valueOf(percentDeletesOpt).intValue val dups = options.valueOf(numDupsOpt).intValue val brokerUrl = options.valueOf(brokerOpt) val topicCount = options.valueOf(topicsOpt).intValue val zkUrl = options.valueOf(zkConnectOpt) val sleepSecs = options.valueOf(sleepSecsOpt).intValue - + val testId = new Random().nextInt(Int.MaxValue) val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray - + println("Producing %d messages...".format(messages)) - val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes) + val producedDataFile = produceMessages(brokerUrl, topics, messages, compressionType, dups, percentDeletes) println("Sleeping for %d seconds...".format(sleepSecs)) Thread.sleep(sleepSecs * 1000) println("Consuming messages...") val consumedDataFile = consumeMessages(zkUrl, topics) - + val producedLines = lineCount(producedDataFile) val consumedLines = lineCount(consumedDataFile) val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble @@ -169,9 +175,9 @@ object TestLogCleaning { } producedDeduped.close() consumedDeduped.close() + println("Validated " + total + " values, " + mismatched + " mismatches.") require(!produced.hasNext, "Additional values produced not found in consumer log.") require(!consumed.hasNext, "Additional values consumed not found in producer log.") - println("Validated " + total + " values, " + mismatched + " mismatches.") require(mismatched == 0, "Non-zero number of row mismatches.") // if all the checks worked out we can delete the deduped files producedDedupedFile.delete() @@ -233,10 +239,11 @@ object TestLogCleaning { }.start() new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024) } - - def produceMessages(brokerUrl: String, - topics: Array[String], - messages: Long, + + def produceMessages(brokerUrl: String, + topics: Array[String], + messages: Long, + compressionType: String, dups: Int, percentDeletes: Int): File = { val producerProps = new Properties @@ -244,6 +251,7 @@ object TestLogCleaning { producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) val rand = new Random(1) val keyCount = (messages / dups).toInt diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 3b5aa9d..d12e46d 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,21 +18,27 @@ package kafka.log import java.io.File +import kafka.common.TopicAndPartition +import kafka.message._ import kafka.server.OffsetCheckpoint +import kafka.utils._ +import org.apache.kafka.common.record.CompressionType +import org.junit.Assert._ +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import org.scalatest.junit.JUnit3Suite import scala.collection._ -import org.junit._ -import kafka.common.TopicAndPartition -import kafka.utils._ -import kafka.message._ -import org.scalatest.junit.JUnitSuite -import junit.framework.Assert._ + /** * This is an integration test that tests the fully integrated log cleaner */ -class LogCleanerIntegrationTest extends JUnitSuite { - +@RunWith(value = classOf[Parameterized]) +class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite { + val time = new MockTime() val segmentSize = 100 val deleteDelay = 1000 @@ -40,16 +46,16 @@ class LogCleanerIntegrationTest extends JUnitSuite { val logDir = TestUtils.tempDir() var counter = 0 val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2)) - + @Test def cleanerTest() { val cleaner = makeCleaner(parts = 3) val log = cleaner.logs.get(topics(0)) - val appends = writeDups(numKeys = 100, numDups = 3, log) + val appends = writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec)) val startSize = log.size cleaner.startup() - + val lastCleaned = log.activeSegment.baseOffset // wait until we clean up to base_offset of active segment - minDirtyMessages cleaner.awaitCleaned("log", 0, lastCleaned) @@ -57,9 +63,9 @@ class LogCleanerIntegrationTest extends JUnitSuite { val read = readFromLog(log) assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap) assertTrue(startSize > log.size) - + // write some more stuff and validate again - val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log) + val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec)) val lastCleaned2 = log.activeSegment.baseOffset cleaner.awaitCleaned("log", 0, lastCleaned2) val read2 = readFromLog(log) @@ -79,19 +85,25 @@ class LogCleanerIntegrationTest extends JUnitSuite { cleaner.shutdown() } - + def readFromLog(log: Log): Iterable[(Int, Int)] = { - for(segment <- log.logSegments; message <- segment.log) yield { - val key = TestUtils.readString(message.message.key).toInt - val value = TestUtils.readString(message.message.payload).toInt + for (segment <- log.logSegments; entry <- segment.log; message <- { + if (entry.message.compressionCodec == NoCompressionCodec) + Stream.cons(entry, Stream.empty).iterator + else + ByteBufferMessageSet.deepIterator(entry.message) + }) yield { + val message = entry.message + val key = TestUtils.readString(message.key).toInt + val value = TestUtils.readString(message.payload).toInt key -> value } } - - def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { + + def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec): Seq[(Int, Int)] = { for(dup <- 0 until numDups; key <- 0 until numKeys) yield { val count = counter - log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) + log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes), assignOffsets = true) counter += 1 (key, count) } @@ -128,4 +140,14 @@ class LogCleanerIntegrationTest extends JUnitSuite { time = time) } +} + +object LogCleanerIntegrationTest { + @Parameters + def parameters: java.util.Collection[Array[String]] = { + val list = new java.util.ArrayList[Array[String]]() + for (codec <- CompressionType.values) + list.add(Array(codec.name)) + list + } } \ No newline at end of file -- 2.3.2 (Apple Git-55) From aaab53095e5c0ccba5a45fc57cd9d3171650ea2d Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Mon, 18 May 2015 13:09:08 -0700 Subject: [PATCH 2/6] Remove no-compression constraint for compacted topics --- .../scala/kafka/message/ByteBufferMessageSet.scala | 3 --- core/src/test/scala/unit/kafka/log/LogTest.scala | 25 ++++------------------ 2 files changed, 4 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 9dfe914..7f97279 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -255,9 +255,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi buffer.reset() this } else { - if (compactedTopic && targetCodec != NoCompressionCodec) - throw new InvalidMessageException("Compacted topic cannot accept compressed messages. " + - "Either the producer sent a compressed message or the topic has been configured with a broker-side compression codec.") // We need to crack open the message-set if any of these are true: // (i) messages are compressed, // (ii) this message-set is sent to a compacted topic (and so we need to verify that each message has a key) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 76d3bfd..8e095d6 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -337,6 +337,7 @@ class LogTest extends JUnitSuite { val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage) val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage) val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage) + val messageSetWithCompressedUnkeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage, unkeyedMessage) val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage) val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage) @@ -356,8 +357,8 @@ class LogTest extends JUnitSuite { case e: InvalidMessageException => // this is good } try { - log.append(messageSetWithCompressedKeyedMessage) - fail("Compacted topics cannot accept compressed messages.") + log.append(messageSetWithCompressedUnkeyedMessage) + fail("Compacted topics cannot accept a message without a key.") } catch { case e: InvalidMessageException => // this is good } @@ -365,25 +366,7 @@ class LogTest extends JUnitSuite { // the following should succeed without any InvalidMessageException log.append(messageSetWithKeyedMessage) log.append(messageSetWithKeyedMessages) - - // test that a compacted topic with broker-side compression type set to uncompressed can accept compressed messages - val uncompressedLog = new Log(logDir, logConfig.copy(compact = true, compressionType = "uncompressed"), - recoveryPoint = 0L, time.scheduler, time) - uncompressedLog.append(messageSetWithCompressedKeyedMessage) - uncompressedLog.append(messageSetWithKeyedMessage) - uncompressedLog.append(messageSetWithKeyedMessages) - try { - uncompressedLog.append(messageSetWithUnkeyedMessage) - fail("Compacted topics cannot accept a message without a key.") - } catch { - case e: InvalidMessageException => // this is good - } - try { - uncompressedLog.append(messageSetWithOneUnkeyedMessage) - fail("Compacted topics cannot accept a message without a key.") - } catch { - case e: InvalidMessageException => // this is good - } + log.append(messageSetWithCompressedKeyedMessage) } /** -- 2.3.2 (Apple Git-55) From 7d90fae8c846bcb144914d123fa540bc7cdca227 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Mon, 18 May 2015 14:47:53 -0700 Subject: [PATCH 3/6] Fix log cleaner integration test --- .../unit/kafka/log/LogCleanerIntegrationTest.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index d12e46d..50d1985 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,6 +18,7 @@ package kafka.log import java.io.File + import kafka.common.TopicAndPartition import kafka.message._ import kafka.server.OffsetCheckpoint @@ -37,7 +38,7 @@ import scala.collection._ * This is an integration test that tests the fully integrated log cleaner */ @RunWith(value = classOf[Parameterized]) -class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite { +class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite with Logging { val time = new MockTime() val segmentSize = 100 @@ -87,15 +88,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite { } def readFromLog(log: Log): Iterable[(Int, Int)] = { - for (segment <- log.logSegments; entry <- segment.log; message <- { - if (entry.message.compressionCodec == NoCompressionCodec) - Stream.cons(entry, Stream.empty).iterator + for (segment <- log.logSegments; shallowEntry <- segment.log; entry <- { + // create single message iterator or deep iterator depending on compression codec + if (shallowEntry.message.compressionCodec == NoCompressionCodec) + Stream.cons(shallowEntry, Stream.empty).iterator else - ByteBufferMessageSet.deepIterator(entry.message) + ByteBufferMessageSet.deepIterator(shallowEntry.message) }) yield { - val message = entry.message - val key = TestUtils.readString(message.key).toInt - val value = TestUtils.readString(message.payload).toInt + val key = TestUtils.readString(entry.message.key).toInt + val value = TestUtils.readString(entry.message.payload).toInt key -> value } } -- 2.3.2 (Apple Git-55) From f47966c71848367c7611f29536823e4e374df08f Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Tue, 19 May 2015 00:10:01 -0700 Subject: [PATCH 4/6] Incorporate edits from latest patch --- core/src/main/scala/kafka/log/LogCleaner.scala | 149 +++++++++------------ .../unit/kafka/log/LogCleanerIntegrationTest.scala | 16 +-- 2 files changed, 71 insertions(+), 94 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 89175c4..70ad661 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -17,22 +17,18 @@ package kafka.log +import java.io.{DataOutputStream, File} +import java.nio._ +import java.util.Date +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import com.yammer.metrics.core.Gauge import kafka.common._ import kafka.message._ -import kafka.utils._ import kafka.metrics.KafkaMetricsGroup +import kafka.utils._ + import scala.collection._ -import scala.math -import java.nio._ -import java.util.Date -import java.io.File -import java.lang.IllegalStateException -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import com.yammer.metrics.core.Gauge -import scala.collection.mutable.ListBuffer -import java.io.ByteArrayOutputStream -import java.io.DataOutputStream /** * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. @@ -404,11 +400,29 @@ private[log] class Cleaner(val id: Int, // check each message to see if it is to be retained var messagesRead = 0 for (entry <- messages.shallowIterator) { - if (entry.message.compressionCodec == NoCompressionCodec) - messagesRead += cleanNonCompressedMessages(source, map, retainDeletes, entry) - else - messagesRead += cleanCompressedMessages(source, map, retainDeletes, entry) + val size = MessageSet.entrySize(entry.message) + stats.readMessage(size) + if (entry.message.compressionCodec == NoCompressionCodec) { + if(shouldRetainMessage(source, map, retainDeletes, entry)) { + ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) + stats.recopyMessage(size) + } + messagesRead += 1 + } + else { + val messages = ByteBufferMessageSet.deepIterator(entry.message) + val retainedMessages = messages.filter(messageAndOffset => { + messagesRead += 1 + shouldRetainMessage(source, map, retainDeletes, messageAndOffset) + }).toList + + if (retainedMessages.nonEmpty) { + val size = compressMessages(writeBuffer, entry.message.compressionCodec, retainedMessages) + stats.recopyMessage(size) + } + } } + position += messages.validBytes // if any messages are to be retained, write them out if(writeBuffer.position > 0) { @@ -425,26 +439,40 @@ private[log] class Cleaner(val id: Int, restoreBuffers() } - /** - * clean the non-compressed message - */ - private def cleanNonCompressedMessages(source: kafka.log.LogSegment, - map: kafka.log.OffsetMap, - retainDeletes: Boolean, - entry: kafka.message.MessageAndOffset): Int = { - val size = MessageSet.entrySize(entry.message) - stats.readMessage(size) - if(retainMessage(source, map, retainDeletes, entry)) { - ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) - stats.recopyMessage(size) + private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messages: Seq[MessageAndOffset]): Int = { + val messagesIterable = messages.toIterable.map(_.message) + if(messages.isEmpty) { + MessageSet.Empty.sizeInBytes + } else if(compressionCodec == NoCompressionCodec) { + for(messageOffset <- messages) + ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset) + MessageSet.messageSetSize(messagesIterable) + } else { + var offset = -1L + val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 2, 1024), 1 << 16)) + messageWriter.write(codec = compressionCodec) { outputStream => + val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) + try { + for (messageOffset <- messages) { + val message = messageOffset.message + offset = messageOffset.offset + output.writeLong(offset) + output.writeInt(message.size) + output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) + } + } finally { + output.close() + } + } + ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset) + messageWriter.size + MessageSet.LogOverhead } - 1 } - private def retainMessage(source: kafka.log.LogSegment, - map: kafka.log.OffsetMap, - retainDeletes: Boolean, - entry: kafka.message.MessageAndOffset): Boolean = { + private def shouldRetainMessage(source: kafka.log.LogSegment, + map: kafka.log.OffsetMap, + retainDeletes: Boolean, + entry: kafka.message.MessageAndOffset): Boolean = { val key = entry.message.key if (key != null) { val foundOffset = map.get(key) @@ -462,57 +490,6 @@ private[log] class Cleaner(val id: Int, } /** - * clean the compressed message - */ - private def cleanCompressedMessages(source: kafka.log.LogSegment, - map: kafka.log.OffsetMap, - retainDeletes: Boolean, - messageAndOffset: kafka.message.MessageAndOffset): Int = { - val size = MessageSet.entrySize(messageAndOffset.message) - stats.readMessage(size) - - val messages = ByteBufferMessageSet.deepIterator(messageAndOffset.message) - var messagesRead = 0 - var retainedMessages = ListBuffer[MessageAndOffset]() - for (entry <- messages) { - messagesRead += 1 - if(retainMessage(source, map, retainDeletes, entry)) { - retainedMessages += entry - } - } - - if(retainedMessages.size != 0) { - val message = compressMessages(messageAndOffset.message.compressionCodec, retainedMessages.toList) - ByteBufferMessageSet.writeMessage(writeBuffer, message.message, message.offset) - stats.recopyMessage(MessageSet.entrySize(message.message)) - } - messagesRead - } - - /** - * compresses the given the list of messages - */ - private def compressMessages(compressionCodec: CompressionCodec, messages: List[MessageAndOffset]): MessageAndOffset = { - val byteArrayStream = new ByteArrayOutputStream(MessageSet.messageSetSize(messages.map(m => m.message))) - val output = new DataOutputStream(CompressionFactory(compressionCodec, byteArrayStream)) - var offset = -1L - try { - for (messageAndOffset <- messages) { - offset = messageAndOffset.offset - val message = messageAndOffset.message - output.writeLong(offset) - output.writeInt(message.size) - output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) - } - } finally { - output.close() - } - val bytes = byteArrayStream.toByteArray - val message = new Message(bytes, compressionCodec) - new MessageAndOffset(message, offset) -} - - /** * Double the I/O buffer capacity */ def growBuffers() { @@ -620,7 +597,7 @@ private[log] class Cleaner(val id: Int, stats.indexMessagesRead(1) } position += messages.validBytes - stats.indexMessageBytesRead(messages.validBytes) + stats.indexBytesRead(messages.validBytes) // if we didn't read even one complete message, our read buffer may be too small if(position == startPosition) @@ -658,7 +635,7 @@ private case class CleanerStats(time: Time = SystemTime) { mapMessagesRead += size } - def indexMessageBytesRead(size: Int) { + def indexBytesRead(size: Int) { mapBytesRead += size } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 50d1985..471ddff 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -38,7 +38,7 @@ import scala.collection._ * This is an integration test that tests the fully integrated log cleaner */ @RunWith(value = classOf[Parameterized]) -class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite with Logging { +class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite { val time = new MockTime() val segmentSize = 100 @@ -88,15 +88,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite w } def readFromLog(log: Log): Iterable[(Int, Int)] = { - for (segment <- log.logSegments; shallowEntry <- segment.log; entry <- { + for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- { // create single message iterator or deep iterator depending on compression codec - if (shallowEntry.message.compressionCodec == NoCompressionCodec) - Stream.cons(shallowEntry, Stream.empty).iterator + if (entry.message.compressionCodec == NoCompressionCodec) + Stream.cons(entry, Stream.empty).iterator else - ByteBufferMessageSet.deepIterator(shallowEntry.message) + ByteBufferMessageSet.deepIterator(entry.message) }) yield { - val key = TestUtils.readString(entry.message.key).toInt - val value = TestUtils.readString(entry.message.payload).toInt + val key = TestUtils.readString(messageAndOffset.message.key).toInt + val value = TestUtils.readString(messageAndOffset.message.payload).toInt key -> value } } @@ -148,7 +148,7 @@ object LogCleanerIntegrationTest { def parameters: java.util.Collection[Array[String]] = { val list = new java.util.ArrayList[Array[String]]() for (codec <- CompressionType.values) - list.add(Array(codec.name)) + list.add(Array(codec.name)) list } } \ No newline at end of file -- 2.3.2 (Apple Git-55) From 1b64bec99cb9ff9906cd6d92941df8826b2179f7 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Tue, 19 May 2015 01:04:59 -0700 Subject: [PATCH 5/6] More minor edits --- core/src/main/scala/kafka/log/LogCleaner.scala | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 70ad661..4338184 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -403,7 +403,7 @@ private[log] class Cleaner(val id: Int, val size = MessageSet.entrySize(entry.message) stats.readMessage(size) if (entry.message.compressionCodec == NoCompressionCodec) { - if(shouldRetainMessage(source, map, retainDeletes, entry)) { + if (shouldRetainMessage(source, map, retainDeletes, entry)) { ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) stats.recopyMessage(size) } @@ -414,18 +414,16 @@ private[log] class Cleaner(val id: Int, val retainedMessages = messages.filter(messageAndOffset => { messagesRead += 1 shouldRetainMessage(source, map, retainDeletes, messageAndOffset) - }).toList + }).toSeq - if (retainedMessages.nonEmpty) { - val size = compressMessages(writeBuffer, entry.message.compressionCodec, retainedMessages) - stats.recopyMessage(size) - } + if (retainedMessages.nonEmpty) + compressMessages(writeBuffer, entry.message.compressionCodec, retainedMessages) } } position += messages.validBytes // if any messages are to be retained, write them out - if(writeBuffer.position > 0) { + if (writeBuffer.position > 0) { writeBuffer.flip() val retained = new ByteBufferMessageSet(writeBuffer) dest.append(retained.head.offset, retained) @@ -433,17 +431,17 @@ private[log] class Cleaner(val id: Int, } // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again - if(readBuffer.limit > 0 && messagesRead == 0) + if (readBuffer.limit > 0 && messagesRead == 0) growBuffers() } restoreBuffers() } - private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messages: Seq[MessageAndOffset]): Int = { + private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messages: Seq[MessageAndOffset]) { val messagesIterable = messages.toIterable.map(_.message) - if(messages.isEmpty) { + if (messages.isEmpty) { MessageSet.Empty.sizeInBytes - } else if(compressionCodec == NoCompressionCodec) { + } else if (compressionCodec == NoCompressionCodec) { for(messageOffset <- messages) ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset) MessageSet.messageSetSize(messagesIterable) @@ -465,7 +463,7 @@ private[log] class Cleaner(val id: Int, } } ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset) - messageWriter.size + MessageSet.LogOverhead + stats.recopyMessage(messageWriter.size + MessageSet.LogOverhead) } } -- 2.3.2 (Apple Git-55) From 68fc6d04412efa47ad268f9ef186ef4d265cce55 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Tue, 19 May 2015 16:19:31 -0700 Subject: [PATCH 6/6] Incorporate Guozhang's comments --- core/src/main/scala/kafka/log/LogCleaner.scala | 3 +-- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 4338184..c9ade72 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -408,8 +408,7 @@ private[log] class Cleaner(val id: Int, stats.recopyMessage(size) } messagesRead += 1 - } - else { + } else { val messages = ByteBufferMessageSet.deepIterator(entry.message) val retainedMessages = messages.filter(messageAndOffset => { messagesRead += 1 diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 7f97279..5a32de8 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -255,10 +255,9 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi buffer.reset() this } else { - // We need to crack open the message-set if any of these are true: - // (i) messages are compressed, - // (ii) this message-set is sent to a compacted topic (and so we need to verify that each message has a key) - // If the broker is configured with a target compression codec then we need to recompress regardless of original codec + // We need to deep-iterate over the message-set if any of these are true: + // (i) messages are compressed + // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec val messages = this.internalIterator(isShallow = false).map(messageAndOffset => { if (compactedTopic && !messageAndOffset.message.hasKey) throw new InvalidMessageException("Compacted topic cannot accept message without key.") -- 2.3.2 (Apple Git-55)