From bfdd5114776efc89c2f2076ec03019d5ee038b56 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Tue, 19 May 2015 00:54:18 -0700 Subject: [PATCH] Rebased patch; incorporate edits and further changes from Manikumar --- core/src/main/scala/kafka/log/LogCleaner.scala | 123 +++++++++++++++------ .../scala/kafka/message/ByteBufferMessageSet.scala | 3 - .../test/scala/kafka/tools/TestLogCleaning.scala | 36 +++--- .../unit/kafka/log/LogCleanerIntegrationTest.scala | 63 +++++++---- core/src/test/scala/unit/kafka/log/LogTest.scala | 25 +---- 5 files changed, 157 insertions(+), 93 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index abea8b2..70ad661 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -17,21 +17,18 @@ package kafka.log +import java.io.{DataOutputStream, File} +import java.nio._ +import java.util.Date +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import com.yammer.metrics.core.Gauge import kafka.common._ import kafka.message._ -import kafka.utils._ import kafka.metrics.KafkaMetricsGroup +import kafka.utils._ import scala.collection._ -import scala.math -import java.nio._ -import java.util.Date -import java.io.File -import java.lang.IllegalStateException -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import com.yammer.metrics.core.Gauge /** * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. @@ -389,7 +386,6 @@ private[log] class Cleaner(val id: Int, * @param map The key=>offset mapping * @param retainDeletes Should delete tombstones be retained while cleaning this segment * - * TODO: Implement proper compression support */ private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) { @@ -403,28 +399,31 @@ private[log] class Cleaner(val id: Int, throttler.maybeThrottle(messages.sizeInBytes) // check each message to see if it is to be retained var messagesRead = 0 - for (entry <- messages) { - messagesRead += 1 + for (entry <- messages.shallowIterator) { val size = MessageSet.entrySize(entry.message) - position += size stats.readMessage(size) - val key = entry.message.key - if (key != null) { - val foundOffset = map.get(key) - /* two cases in which we can get rid of a message: - * 1) if there exists a message with the same key but higher offset - * 2) if the message is a delete "tombstone" marker and enough time has passed - */ - val redundant = foundOffset >= 0 && entry.offset < foundOffset - val obsoleteDelete = !retainDeletes && entry.message.isNull - if (!redundant && !obsoleteDelete) { + if (entry.message.compressionCodec == NoCompressionCodec) { + if(shouldRetainMessage(source, map, retainDeletes, entry)) { ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) stats.recopyMessage(size) } - } else { - stats.invalidMessage() + messagesRead += 1 + } + else { + val messages = ByteBufferMessageSet.deepIterator(entry.message) + val retainedMessages = messages.filter(messageAndOffset => { + messagesRead += 1 + shouldRetainMessage(source, map, retainDeletes, messageAndOffset) + }).toList + + if (retainedMessages.nonEmpty) { + val size = compressMessages(writeBuffer, entry.message.compressionCodec, retainedMessages) + stats.recopyMessage(size) + } } } + + position += messages.validBytes // if any messages are to be retained, write them out if(writeBuffer.position > 0) { writeBuffer.flip() @@ -439,7 +438,57 @@ private[log] class Cleaner(val id: Int, } restoreBuffers() } - + + private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messages: Seq[MessageAndOffset]): Int = { + val messagesIterable = messages.toIterable.map(_.message) + if(messages.isEmpty) { + MessageSet.Empty.sizeInBytes + } else if(compressionCodec == NoCompressionCodec) { + for(messageOffset <- messages) + ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset) + MessageSet.messageSetSize(messagesIterable) + } else { + var offset = -1L + val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 2, 1024), 1 << 16)) + messageWriter.write(codec = compressionCodec) { outputStream => + val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) + try { + for (messageOffset <- messages) { + val message = messageOffset.message + offset = messageOffset.offset + output.writeLong(offset) + output.writeInt(message.size) + output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) + } + } finally { + output.close() + } + } + ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset) + messageWriter.size + MessageSet.LogOverhead + } + } + + private def shouldRetainMessage(source: kafka.log.LogSegment, + map: kafka.log.OffsetMap, + retainDeletes: Boolean, + entry: kafka.message.MessageAndOffset): Boolean = { + val key = entry.message.key + if (key != null) { + val foundOffset = map.get(key) + /* two cases in which we can get rid of a message: + * 1) if there exists a message with the same key but higher offset + * 2) if the message is a delete "tombstone" marker and enough time has passed + */ + val redundant = foundOffset >= 0 && entry.offset < foundOffset + val obsoleteDelete = !retainDeletes && entry.message.isNull + !redundant && !obsoleteDelete + } else { + stats.invalidMessage() + false + } + } + /** * Double the I/O buffer capacity */ @@ -542,13 +591,14 @@ private[log] class Cleaner(val id: Int, val startPosition = position for (entry <- messages) { val message = entry.message - val size = MessageSet.entrySize(message) - position += size if (message.hasKey) map.put(message.key, entry.offset) offset = entry.offset - stats.indexMessage(size) + stats.indexMessagesRead(1) } + position += messages.validBytes + stats.indexBytesRead(messages.validBytes) + // if we didn't read even one complete message, our read buffer may be too small if(position == startPosition) growBuffers() @@ -580,16 +630,19 @@ private case class CleanerStats(time: Time = SystemTime) { messagesWritten += 1 bytesWritten += size } - - def indexMessage(size: Int) { - mapMessagesRead += 1 + + def indexMessagesRead(size: Int) { + mapMessagesRead += size + } + + def indexBytesRead(size: Int) { mapBytesRead += size } - + def indexDone() { mapCompleteTime = time.milliseconds } - + def allDone() { endTime = time.milliseconds } diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 9dfe914..7f97279 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -255,9 +255,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi buffer.reset() this } else { - if (compactedTopic && targetCodec != NoCompressionCodec) - throw new InvalidMessageException("Compacted topic cannot accept compressed messages. " + - "Either the producer sent a compressed message or the topic has been configured with a broker-side compression codec.") // We need to crack open the message-set if any of these are true: // (i) messages are compressed, // (ii) this message-set is sent to a compacted topic (and so we need to verify that each message has a key) diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala index 8445894..dcbfbe1 100755 --- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala @@ -52,6 +52,11 @@ object TestLogCleaning { .describedAs("count") .ofType(classOf[java.lang.Long]) .defaultsTo(Long.MaxValue) + val messageCompressionOpt = parser.accepts("compression-type", "message compression type") + .withOptionalArg() + .describedAs("compressionType") + .ofType(classOf[java.lang.String]) + .defaultsTo("none") val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.") .withRequiredArg .describedAs("count") @@ -84,38 +89,39 @@ object TestLogCleaning { .withRequiredArg .describedAs("directory") .ofType(classOf[String]) - + val options = parser.parse(args:_*) - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.") - + if(options.has(dumpOpt)) { dumpLog(new File(options.valueOf(dumpOpt))) System.exit(0) } - + CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt) - + // parse options val messages = options.valueOf(numMessagesOpt).longValue + val compressionType = options.valueOf(messageCompressionOpt) val percentDeletes = options.valueOf(percentDeletesOpt).intValue val dups = options.valueOf(numDupsOpt).intValue val brokerUrl = options.valueOf(brokerOpt) val topicCount = options.valueOf(topicsOpt).intValue val zkUrl = options.valueOf(zkConnectOpt) val sleepSecs = options.valueOf(sleepSecsOpt).intValue - + val testId = new Random().nextInt(Int.MaxValue) val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray - + println("Producing %d messages...".format(messages)) - val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes) + val producedDataFile = produceMessages(brokerUrl, topics, messages, compressionType, dups, percentDeletes) println("Sleeping for %d seconds...".format(sleepSecs)) Thread.sleep(sleepSecs * 1000) println("Consuming messages...") val consumedDataFile = consumeMessages(zkUrl, topics) - + val producedLines = lineCount(producedDataFile) val consumedLines = lineCount(consumedDataFile) val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble @@ -169,9 +175,9 @@ object TestLogCleaning { } producedDeduped.close() consumedDeduped.close() + println("Validated " + total + " values, " + mismatched + " mismatches.") require(!produced.hasNext, "Additional values produced not found in consumer log.") require(!consumed.hasNext, "Additional values consumed not found in producer log.") - println("Validated " + total + " values, " + mismatched + " mismatches.") require(mismatched == 0, "Non-zero number of row mismatches.") // if all the checks worked out we can delete the deduped files producedDedupedFile.delete() @@ -233,10 +239,11 @@ object TestLogCleaning { }.start() new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024) } - - def produceMessages(brokerUrl: String, - topics: Array[String], - messages: Long, + + def produceMessages(brokerUrl: String, + topics: Array[String], + messages: Long, + compressionType: String, dups: Int, percentDeletes: Int): File = { val producerProps = new Properties @@ -244,6 +251,7 @@ object TestLogCleaning { producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) val rand = new Random(1) val keyCount = (messages / dups).toInt diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 3b5aa9d..471ddff 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,21 +18,28 @@ package kafka.log import java.io.File -import kafka.server.OffsetCheckpoint -import scala.collection._ -import org.junit._ import kafka.common.TopicAndPartition -import kafka.utils._ import kafka.message._ -import org.scalatest.junit.JUnitSuite -import junit.framework.Assert._ +import kafka.server.OffsetCheckpoint +import kafka.utils._ +import org.apache.kafka.common.record.CompressionType +import org.junit.Assert._ +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import org.scalatest.junit.JUnit3Suite + +import scala.collection._ + /** * This is an integration test that tests the fully integrated log cleaner */ -class LogCleanerIntegrationTest extends JUnitSuite { - +@RunWith(value = classOf[Parameterized]) +class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite { + val time = new MockTime() val segmentSize = 100 val deleteDelay = 1000 @@ -40,16 +47,16 @@ class LogCleanerIntegrationTest extends JUnitSuite { val logDir = TestUtils.tempDir() var counter = 0 val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2)) - + @Test def cleanerTest() { val cleaner = makeCleaner(parts = 3) val log = cleaner.logs.get(topics(0)) - val appends = writeDups(numKeys = 100, numDups = 3, log) + val appends = writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec)) val startSize = log.size cleaner.startup() - + val lastCleaned = log.activeSegment.baseOffset // wait until we clean up to base_offset of active segment - minDirtyMessages cleaner.awaitCleaned("log", 0, lastCleaned) @@ -57,9 +64,9 @@ class LogCleanerIntegrationTest extends JUnitSuite { val read = readFromLog(log) assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap) assertTrue(startSize > log.size) - + // write some more stuff and validate again - val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log) + val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec)) val lastCleaned2 = log.activeSegment.baseOffset cleaner.awaitCleaned("log", 0, lastCleaned2) val read2 = readFromLog(log) @@ -79,19 +86,25 @@ class LogCleanerIntegrationTest extends JUnitSuite { cleaner.shutdown() } - + def readFromLog(log: Log): Iterable[(Int, Int)] = { - for(segment <- log.logSegments; message <- segment.log) yield { - val key = TestUtils.readString(message.message.key).toInt - val value = TestUtils.readString(message.message.payload).toInt + for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- { + // create single message iterator or deep iterator depending on compression codec + if (entry.message.compressionCodec == NoCompressionCodec) + Stream.cons(entry, Stream.empty).iterator + else + ByteBufferMessageSet.deepIterator(entry.message) + }) yield { + val key = TestUtils.readString(messageAndOffset.message.key).toInt + val value = TestUtils.readString(messageAndOffset.message.payload).toInt key -> value } } - - def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { + + def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec): Seq[(Int, Int)] = { for(dup <- 0 until numDups; key <- 0 until numKeys) yield { val count = counter - log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) + log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes), assignOffsets = true) counter += 1 (key, count) } @@ -128,4 +141,14 @@ class LogCleanerIntegrationTest extends JUnitSuite { time = time) } +} + +object LogCleanerIntegrationTest { + @Parameters + def parameters: java.util.Collection[Array[String]] = { + val list = new java.util.ArrayList[Array[String]]() + for (codec <- CompressionType.values) + list.add(Array(codec.name)) + list + } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 76d3bfd..8e095d6 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -337,6 +337,7 @@ class LogTest extends JUnitSuite { val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage) val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage) val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage) + val messageSetWithCompressedUnkeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage, unkeyedMessage) val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage) val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage) @@ -356,8 +357,8 @@ class LogTest extends JUnitSuite { case e: InvalidMessageException => // this is good } try { - log.append(messageSetWithCompressedKeyedMessage) - fail("Compacted topics cannot accept compressed messages.") + log.append(messageSetWithCompressedUnkeyedMessage) + fail("Compacted topics cannot accept a message without a key.") } catch { case e: InvalidMessageException => // this is good } @@ -365,25 +366,7 @@ class LogTest extends JUnitSuite { // the following should succeed without any InvalidMessageException log.append(messageSetWithKeyedMessage) log.append(messageSetWithKeyedMessages) - - // test that a compacted topic with broker-side compression type set to uncompressed can accept compressed messages - val uncompressedLog = new Log(logDir, logConfig.copy(compact = true, compressionType = "uncompressed"), - recoveryPoint = 0L, time.scheduler, time) - uncompressedLog.append(messageSetWithCompressedKeyedMessage) - uncompressedLog.append(messageSetWithKeyedMessage) - uncompressedLog.append(messageSetWithKeyedMessages) - try { - uncompressedLog.append(messageSetWithUnkeyedMessage) - fail("Compacted topics cannot accept a message without a key.") - } catch { - case e: InvalidMessageException => // this is good - } - try { - uncompressedLog.append(messageSetWithOneUnkeyedMessage) - fail("Compacted topics cannot accept a message without a key.") - } catch { - case e: InvalidMessageException => // this is good - } + log.append(messageSetWithCompressedKeyedMessage) } /** -- 1.7.12.4