From 795885c07be0f63d77501a2828db50a3a4e133da Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Mon, 23 Feb 2015 06:40:56 -0800 Subject: [PATCH] Add compacted topic constraint checks - reject unkeyed messages; reject compressed messages if topic's broker-side compression is not uncompressed --- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogCleaner.scala | 49 +++--- .../main/scala/kafka/log/LogCleanerManager.scala | 12 +- .../scala/kafka/message/ByteBufferMessageSet.scala | 26 ++- .../main/scala/kafka/server/OffsetManager.scala | 1 + .../test/scala/unit/kafka/log/CleanerTest.scala | 41 ++++- core/src/test/scala/unit/kafka/log/LogTest.scala | 186 ++++++++++++++------- .../kafka/message/ByteBufferMessageSetTest.scala | 4 +- 8 files changed, 219 insertions(+), 102 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 846023b..65c26f7 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -288,7 +288,7 @@ class Log(val dir: File, // assign offsets to the message set val offset = new AtomicLong(nextOffsetMetadata.messageOffset) try { - validMessages = validMessages.assignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec) + validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index f8e7cd5..5991428 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -133,7 +133,7 @@ class LogCleaner(val config: CleanerConfig, * Update checkpoint file, removing topics and partitions that no longer exist */ def updateCheckpoints(dataDir: File) { - cleanerManager.updateCheckpoints(dataDir, update=None); + cleanerManager.updateCheckpoints(dataDir, update=None) } /** @@ -152,8 +152,7 @@ class LogCleaner(val config: CleanerConfig, } /** - * TODO: - * For testing, a way to know when work has completed. This method blocks until the + * For testing, a way to know when work has completed. This method blocks until the * cleaner has processed up to the given offset on the specified topic/partition */ def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = { @@ -243,7 +242,7 @@ class LogCleaner(val config: CleanerConfig, "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), stats.elapsedIndexSecs, mb(stats.mapBytesRead)/stats.elapsedIndexSecs, - 100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) + + 100 * stats.elapsedIndexSecs/stats.elapsedSecs) + "\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) + "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), stats.elapsedSecs - stats.elapsedIndexSecs, @@ -253,6 +252,9 @@ class LogCleaner(val config: CleanerConfig, "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead), 100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead)) info(message) + if (stats.invalidMessagesRead > 0) { + warn("\tFound %d invalid messages during compaction.".format(stats.invalidMessagesRead)) + } } } @@ -374,7 +376,7 @@ private[log] class Cleaner(val id: Int, } catch { case e: LogCleaningAbortedException => cleaned.delete() - throw e + throw e } } @@ -407,17 +409,20 @@ private[log] class Cleaner(val id: Int, position += size stats.readMessage(size) val key = entry.message.key - require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath)) - val foundOffset = map.get(key) - /* two cases in which we can get rid of a message: - * 1) if there exists a message with the same key but higher offset - * 2) if the message is a delete "tombstone" marker and enough time has passed - */ - val redundant = foundOffset >= 0 && entry.offset < foundOffset - val obsoleteDelete = !retainDeletes && entry.message.isNull - if (!redundant && !obsoleteDelete) { - ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) - stats.recopyMessage(size) + if (key != null) { + val foundOffset = map.get(key) + /* two cases in which we can get rid of a message: + * 1) if there exists a message with the same key but higher offset + * 2) if the message is a delete "tombstone" marker and enough time has passed + */ + val redundant = foundOffset >= 0 && entry.offset < foundOffset + val obsoleteDelete = !retainDeletes && entry.message.isNull + if (!redundant && !obsoleteDelete) { + ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) + stats.recopyMessage(size) + } + } else { + stats.invalidMessage() } } // if any messages are to be retained, write them out @@ -536,10 +541,10 @@ private[log] class Cleaner(val id: Int, val startPosition = position for (entry <- messages) { val message = entry.message - require(message.hasKey) val size = MessageSet.entrySize(message) position += size - map.put(message.key, entry.offset) + if (message.hasKey) + map.put(message.key, entry.offset) offset = entry.offset stats.indexMessage(size) } @@ -556,7 +561,8 @@ private[log] class Cleaner(val id: Int, * A simple struct for collecting stats about log cleaning */ private case class CleanerStats(time: Time = SystemTime) { - var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, messagesWritten = 0L + var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, + messagesWritten, invalidMessagesRead = 0L var bufferUtilization = 0.0d clear() @@ -564,6 +570,10 @@ private case class CleanerStats(time: Time = SystemTime) { messagesRead += 1 bytesRead += size } + + def invalidMessage() { + invalidMessagesRead += 1 + } def recopyMessage(size: Int) { messagesWritten += 1 @@ -596,6 +606,7 @@ private case class CleanerStats(time: Time = SystemTime) { mapBytesRead = 0L mapMessagesRead = 0L messagesRead = 0L + invalidMessagesRead = 0L messagesWritten = 0L bufferUtilization = 0.0d } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index fd87d90..351824b 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -122,8 +122,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { abortAndPauseCleaning(topicAndPartition) resumeCleaning(topicAndPartition) - info("The cleaning for partition %s is aborted".format(topicAndPartition)) } + info("The cleaning for partition %s is aborted".format(topicAndPartition)) } /** @@ -152,8 +152,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } while (!isCleaningInState(topicAndPartition, LogCleaningPaused)) pausedCleaningCond.await(100, TimeUnit.MILLISECONDS) - info("The cleaning for partition %s is aborted and paused".format(topicAndPartition)) } + info("The cleaning for partition %s is aborted and paused".format(topicAndPartition)) } /** @@ -181,14 +181,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To /** * Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call. */ - def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = { + private def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = { inProgress.get(topicAndPartition) match { - case None => return false + case None => false case Some(state) => if (state == expectedState) - return true + true else - return false + false } } diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index f46ad5c..be2f1bf 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -199,11 +199,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi } /** - * Update the offsets for this message set. This method attempts to do an in-place conversion - * if there is no compression, but otherwise recopies the messages + * Update the offsets for this message set and do further validation on messages. This method attempts to do an + * in-place conversion if there is no compression, but otherwise recopies the messages */ - private[kafka] def assignOffsets(offsetCounter: AtomicLong, sourceCodec: CompressionCodec, targetCodec: CompressionCodec): ByteBufferMessageSet = { - if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { + private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong, + sourceCodec: CompressionCodec, + targetCodec: CompressionCodec, + compactedTopic: Boolean = false): ByteBufferMessageSet = { + if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec && !compactedTopic) { // do an in-place conversion var position = 0 buffer.mark() @@ -215,8 +218,19 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi buffer.reset() this } else { - // messages are compressed, crack open the messageset and recompress with correct offset - val messages = this.internalIterator(isShallow = false).map(_.message) + if (compactedTopic && targetCodec != NoCompressionCodec) + throw new InvalidMessageException("Compacted topic cannot accept compressed messages. " + + "Either the producer sent a compressed message or the topic has been configured with a broker-side compression codec.") + // We need to crack open the message-set if any of these are true: + // (i) messages are compressed, + // (ii) this message-set is sent to a compacted topic (and so we need to verify that each message has a key) + // If the broker is configured with a target compression codec then we need to recompress regardless of original codec + val messages = this.internalIterator(isShallow = false).map(messageAndOffset => { + if (compactedTopic && !messageAndOffset.message.hasKey) + throw new InvalidMessageException("Compacted topic cannot accept message without key.") + + messageAndOffset.message + }) new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*) } } diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 83d5264..c602a80 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -168,6 +168,7 @@ class OffsetManager(val config: OffsetManagerConfig, val props = new Properties props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) props.put(LogConfig.CleanupPolicyProp, "compact") + props.put(LogConfig.CompressionTypeProp, "none") props } diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index d10e4f4..0a5ac2c 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -90,10 +90,40 @@ class CleanerTest extends JUnitSuite { assertTrue("None of the keys we deleted should still exist.", (0 until leo.toInt by 2).forall(!keys.contains(_))) } + + @Test + def testCleaningWithUnkeyedMessages { + val cleaner = makeCleaner(Int.MaxValue) + + // create a log with compaction turned off so we can append unkeyed messages + val log = makeLog(config = logConfig.copy(segmentSize = 1024, compact = false)) + + // append messages with unkeyed messages + while(log.numberOfSegments < 2) + log.append(unkeyedMessage(log.logEndOffset.toInt)) + + val sizeWithUnkeyedMessages = log.size + + // append messages with unkeyed messages + while(log.numberOfSegments < 3) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + + val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages + + // turn on compaction and compact the log + val compactedLog = makeLog(config = logConfig.copy(segmentSize = 1024)) + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0)) + + assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log)) + assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size) + } /* extract all the keys from a log */ - def keysInLog(log: Log): Iterable[Int] = - log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt)) + def keysInLog(log: Log): Iterable[Int] = + log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => Utils.readString(m.message.key).toInt)) + + def unkeyedMessageCountInLog(log: Log) = + log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum def abortCheckDone(topicAndPartition: TopicAndPartition) { throw new LogCleaningAbortedException() @@ -130,7 +160,7 @@ class CleanerTest extends JUnitSuite { // append some messages to the log var i = 0 while(log.numberOfSegments < 10) { - log.append(TestUtils.singleMessageSet("hello".getBytes)) + log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes)) i += 1 } @@ -220,7 +250,10 @@ class CleanerTest extends JUnitSuite { def message(key: Int, value: Int) = new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes)) - + + def unkeyedMessage(value: Int) = + new ByteBufferMessageSet(new Message(bytes=value.toString.getBytes)) + def deleteMessage(key: Int) = new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=null)) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c2dd8eb..1a4be70 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -62,10 +62,10 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) // create a log - val log = new Log(logDir, - logConfig.copy(segmentMs = 1 * 60 * 60L), - recoveryPoint = 0L, - scheduler = time.scheduler, + val log = new Log(logDir, + logConfig.copy(segmentMs = 1 * 60 * 60L), + recoveryPoint = 0L, + scheduler = time.scheduler, time = time) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) time.sleep(log.config.segmentMs + 1) @@ -151,7 +151,7 @@ class LogTest extends JUnitSuite { def testAppendAndReadWithSequentialOffsets() { val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray - + for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) for(i <- 0 until messages.length) { @@ -161,7 +161,7 @@ class LogTest extends JUnitSuite { } assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size) } - + /** * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message * from any offset less than the logEndOffset including offsets not appended. @@ -171,7 +171,7 @@ class LogTest extends JUnitSuite { val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) - + // now test the case that we give the offsets and use non-sequential offsets for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false) @@ -182,27 +182,27 @@ class LogTest extends JUnitSuite { assertEquals("Message should match appended.", messages(idx), read.message) } } - + /** * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment. * Specifically we create a log where the last message in the first segment has offset 0. If we - * then read offset 1, we should expect this read to come from the second segment, even though the + * then read offset 1, we should expect this read to come from the second segment, even though the * first segment has the greatest lower bound on the offset. */ @Test def testReadAtLogGap() { val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time) - + // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) - log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) - + log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) + // now manually truncate off all but one message from the first segment to create a gap in the messages log.logSegments.head.truncateTo(1) - + assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset) } - + /** * Test reading at the boundary of the log, specifically * - reading from the logEndOffset should give an empty message set @@ -250,13 +250,13 @@ class LogTest extends JUnitSuite { } val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet assertEquals("Should be no more messages", 0, lastRead.size) - + // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure TestUtils.retry(1000L){ assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) } } - + /** * Test reads at offsets that fall within compressed message set boundaries. */ @@ -264,20 +264,20 @@ class LogTest extends JUnitSuite { def testCompressedMessages() { /* this log should roll after every messageset */ val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) - + /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - + def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message) - + /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset) assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset) assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset) } - + /** * Test garbage collecting old segments */ @@ -289,7 +289,7 @@ class LogTest extends JUnitSuite { val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) for(i <- 0 until messagesToAppend) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) - + var currOffset = log.logEndOffset assertEquals(currOffset, messagesToAppend) @@ -300,10 +300,10 @@ class LogTest extends JUnitSuite { assertEquals("We should still have one segment left", 1, log.numberOfSegments) assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true)) assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) - assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", + assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", currOffset, log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset) - + // cleanup the log log.delete() } @@ -328,6 +328,64 @@ class LogTest extends JUnitSuite { } } + @Test + def testCompactedTopicConstraints() { + val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes) + val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes) + val unkeyedMessage = new Message(bytes = "this message does not have a key".getBytes) + + val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage) + val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage) + val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage) + + val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage) + val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage) + + val log = new Log(logDir, logConfig.copy(compact = true), recoveryPoint = 0L, time.scheduler, time) + + try { + log.append(messageSetWithUnkeyedMessage) + fail("Compacted topics cannot accept a message without a key.") + } catch { + case e: InvalidMessageException => // this is good + } + try { + log.append(messageSetWithOneUnkeyedMessage) + fail("Compacted topics cannot accept a message without a key.") + } catch { + case e: InvalidMessageException => // this is good + } + try { + log.append(messageSetWithCompressedKeyedMessage) + fail("Compacted topics cannot accept compressed messages.") + } catch { + case e: InvalidMessageException => // this is good + } + + // the following should succeed without any InvalidMessageException + log.append(messageSetWithKeyedMessage) + log.append(messageSetWithKeyedMessages) + + // test that a compacted topic with broker-side compression type set to uncompressed can accept compressed messages + val uncompressedLog = new Log(logDir, logConfig.copy(compact = true, compressionType = "uncompressed"), + recoveryPoint = 0L, time.scheduler, time) + uncompressedLog.append(messageSetWithCompressedKeyedMessage) + uncompressedLog.append(messageSetWithKeyedMessage) + uncompressedLog.append(messageSetWithKeyedMessages) + try { + uncompressedLog.append(messageSetWithUnkeyedMessage) + fail("Compacted topics cannot accept a message without a key.") + } catch { + case e: InvalidMessageException => // this is good + } + try { + uncompressedLog.append(messageSetWithOneUnkeyedMessage) + fail("Compacted topics cannot accept a message without a key.") + } catch { + case e: InvalidMessageException => // this is good + } + } + /** * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the * setting and checking that an exception is thrown. @@ -369,13 +427,13 @@ class LogTest extends JUnitSuite { val numIndexEntries = log.activeSegment.index.entries val lastOffset = log.logEndOffset log.close() - + log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() - + // test recovery case log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) @@ -383,7 +441,7 @@ class LogTest extends JUnitSuite { assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() } - + /** * Test that if we manually delete an index segment it is rebuilt when the log is re-opened */ @@ -397,12 +455,12 @@ class LogTest extends JUnitSuite { log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) val indexFiles = log.logSegments.map(_.index.file) log.close() - + // delete all the index files indexFiles.foreach(_.delete()) - + // reopen the log - log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) assertEquals(i, log.read(i, 100, None).messageSet.head.offset) @@ -425,10 +483,10 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments) assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset) - + val lastOffset = log.logEndOffset val size = log.size log.truncateTo(log.logEndOffset) // keep the entire log @@ -446,7 +504,7 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) assertEquals("Should be back to original size", log.size, size) log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1)) @@ -497,23 +555,23 @@ class LogTest extends JUnitSuite { def testBogusIndexSegmentsAreRemoved() { val bogusIndex1 = Log.indexFilename(logDir, 0) val bogusIndex2 = Log.indexFilename(logDir, 5) - + val set = TestUtils.singleMessageSet("test".getBytes()) - val log = new Log(logDir, - logConfig.copy(segmentSize = set.sizeInBytes * 5, - maxIndexSize = 1000, + val log = new Log(logDir, + logConfig.copy(segmentSize = set.sizeInBytes * 5, + maxIndexSize = 1000, indexInterval = 1), recoveryPoint = 0L, time.scheduler, time) - + assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertFalse("The second index file should have been deleted.", bogusIndex2.exists) - + // check that we can append to the log for(i <- 0 until 10) log.append(set) - + log.delete() } @@ -523,22 +581,22 @@ class LogTest extends JUnitSuite { @Test def testReopenThenTruncate() { val set = TestUtils.singleMessageSet("test".getBytes()) - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, - maxIndexSize = 1000, + val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, + maxIndexSize = 1000, indexInterval = 10000) // create a log - var log = new Log(logDir, + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) - + // add enough messages to roll over several segments then close and re-open and attempt to truncate for(i <- 0 until 100) log.append(set) log.close() - log = new Log(logDir, + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, @@ -547,7 +605,7 @@ class LogTest extends JUnitSuite { assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) } - + /** * Test that deleted files are deleted after the appropriate time. */ @@ -555,38 +613,38 @@ class LogTest extends JUnitSuite { def testAsyncDelete() { val set = TestUtils.singleMessageSet("test".getBytes()) val asyncDeleteMs = 1000 - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, - fileDeleteDelayMs = asyncDeleteMs, - maxIndexSize = 1000, + val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, + fileDeleteDelayMs = asyncDeleteMs, + maxIndexSize = 1000, indexInterval = 10000) val log = new Log(logDir, config, - recoveryPoint = 0L, + recoveryPoint = 0L, time.scheduler, time) - + // append some messages to create some segments for(i <- 0 until 100) log.append(set) - + // files should be renamed val segments = log.logSegments.toArray val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file) log.deleteOldSegments((s) => true) - + assertEquals("Only one segment should remain.", 1, log.numberOfSegments) - assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && + assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix))) assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) && segments.forall(_.index.file.exists)) assertTrue("The original file should be gone.", oldFiles.forall(!_.exists)) - + // when enough time passes the files should be deleted val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file) time.sleep(asyncDeleteMs + 1) assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists)) } - + /** * Any files ending in .deleted should be removed when the log is re-opened. */ @@ -599,22 +657,22 @@ class LogTest extends JUnitSuite { recoveryPoint = 0L, time.scheduler, time) - + // append some messages to create some segments for(i <- 0 until 100) log.append(set) - + log.deleteOldSegments((s) => true) log.close() - - log = new Log(logDir, + + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } - + @Test def testAppendMessageWithNullPayload() { val log = new Log(logDir, @@ -627,9 +685,9 @@ class LogTest extends JUnitSuite { assertEquals(0, messageSet.head.offset) assertTrue("Message payload should be null.", messageSet.head.message.isNull) } - + @Test - def testCorruptLog() { + def testCorruptLog() { // append some messages to create some segments val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) val set = TestUtils.singleMessageSet("test".getBytes()) @@ -647,11 +705,11 @@ class LogTest extends JUnitSuite { log.append(set) val messages = log.logSegments.flatMap(_.log.iterator.toList) log.close() - + // corrupt index and log by appending random bytes TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1) TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) - + // attempt recovery log = new Log(logDir, config, recoveryPoint, time.scheduler, time) assertEquals(numMessages, log.logEndOffset) diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index 73a2637..07bc317 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { // check uncompressed offsets checkOffsets(messages, 0) var offset = 1234567 - checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset) + checkOffsets(messages.validateMessagesAndAssignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset) // check compressed messages checkOffsets(compressedMessages, 0) - checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset) + checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset) } /* check that offsets are assigned based on byte offset from the given base offset */ -- 2.1.2