From 0d565a41c80e9aaf00d1ae36484d16838dd5bd7b Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Wed, 3 Jul 2013 17:07:05 -0700 Subject: [PATCH] Implement flush management support by adding the new concept of "recovery point" and keeping a checkpoint of recovery points. Remove the concept of "clean shutdown", now we just force a checkpoint at shutdown. --- core/src/main/scala/kafka/log/Log.scala | 111 ++++++++++---------- core/src/main/scala/kafka/log/LogManager.scala | 36 +++++-- core/src/main/scala/kafka/log/LogSegment.scala | 7 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 12 +-- core/src/main/scala/kafka/server/KafkaServer.scala | 1 + .../src/test/scala/other/kafka/StressTestLog.scala | 2 +- .../scala/other/kafka/TestLinearWriteSpeed.scala | 101 +++++++++++++----- .../scala/other/kafka/TestLogPerformance.scala | 58 ---------- .../test/scala/unit/kafka/log/CleanerTest.scala | 2 +- .../unit/kafka/log/LogCleanerIntegrationTest.scala | 2 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 39 ++++++- core/src/test/scala/unit/kafka/log/LogTest.scala | 56 +++++----- .../server/HighwatermarkPersistenceTest.scala | 1 + .../unit/kafka/server/ServerShutdownTest.scala | 7 +- 14 files changed, 246 insertions(+), 189 deletions(-) delete mode 100644 core/src/test/scala/other/kafka/TestLogPerformance.scala diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index d1c3d72..abf1db7 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -38,19 +38,17 @@ import com.yammer.metrics.core.Gauge * for a given segment. * * @param dir The directory in which log segments are created. - * @param maxSegmentSize The maximum segment size in bytes. - * @param maxMessageSize The maximum message size in bytes (including headers) that will be allowed in this log. - * @param flushInterval The number of messages that can be appended to this log before we force a flush of the log. - * @param rollIntervalMs The time after which we will force the rolling of a new log segment - * @param needsRecovery Should we run recovery on this log when opening it? This should be done if the log wasn't cleanly shut down. - * @param maxIndexSize The maximum size of an offset index in this log. The index of the active log segment will be pre-allocated to this size. - * @param indexIntervalBytes The (approximate) number of bytes between entries in the offset index for this log. + * @param config The log configuration settings + * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk + * @param scheduler The thread pool scheduler used for background actions + * @param recoverFrom The position from which to begin running recovery (i.e. the last flush point in the log) + * @param time The time instance used for checking the clock * */ @threadsafe class Log(val dir: File, @volatile var config: LogConfig, - val needsRecovery: Boolean, + @volatile var recoveryPoint: Long = 0L, val scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { @@ -59,14 +57,12 @@ class Log(val dir: File, /* A lock that guards all modifications to the log */ private val lock = new Object - /* The current number of unflushed messages appended to the write */ - private val unflushed = new AtomicInteger(0) - /* last time it was flushed */ private val lastflushedTime = new AtomicLong(time.milliseconds) /* the actual segments of the log */ - private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments() + private val segments: ConcurrentNavigableMap[Long,LogSegment] = new ConcurrentSkipListMap[Long, LogSegment] + loadSegments() /* The number of times the log has been truncated */ private val truncates = new AtomicInteger(0) @@ -86,10 +82,7 @@ class Log(val dir: File, def name = dir.getName() /* Load the log segments from the log files on disk */ - private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = { - // open all the segments read-only - val logSegments = new ConcurrentSkipListMap[Long, LogSegment] - + private def loadSegments() { // create the log directory if it doesn't exist dir.mkdirs() @@ -145,37 +138,40 @@ class Log(val dir: File, error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) } - logSegments.put(start, segment) + segments.put(start, segment) } } if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 - logSegments.put(0, - new LogSegment(dir = dir, + segments.put(0, new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize)) } else { // reset the index size of the currently active log segment to allow more entries - val active = logSegments.lastEntry.getValue - active.index.resize(config.maxIndexSize) - - // run recovery on the active segment if necessary - if(needsRecovery) { - info("Recovering active segment of %s.".format(name)) - active.recover(config.maxMessageSize) + activeSegment.index.resize(config.maxIndexSize) + + // run recovery on any unflushed segments + val unflushed = logSegments(segments.floorKey(this.recoveryPoint), Long.MaxValue).iterator + while(unflushed.hasNext) { + val curr = unflushed.next + info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name)) + val truncatedBytes = curr.recover(config.maxMessageSize) + if(truncatedBytes > 0) { + // we had an invalid message, delete all remaining log + warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset)) + unflushed.foreach(deleteSegment) + } } } - // Check for the index file of every segment, if it's empty or its last offset is greater than its base offset. - for (s <- asIterable(logSegments.values)) { + // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset. + for (s <- logSegments) { require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) } - - logSegments } /** @@ -263,7 +259,8 @@ class Log(val dir: File, trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages)) - maybeFlush(appendInfo.shallowCount) + if(unflushedMessages >= config.flushInterval) + flush() appendInfo } @@ -443,11 +440,8 @@ class Log(val dir: File, * @return The newly rolled segment */ def roll(): LogSegment = { + val start = time.nanoseconds lock synchronized { - // flush the log to ensure that only the active segment needs to be recovered - if(!segments.isEmpty()) - flush() - val newOffset = logEndOffset val logFile = logFilename(dir, newOffset) val indexFile = indexFilename(dir, newOffset) @@ -456,7 +450,6 @@ class Log(val dir: File, file.delete() } - info("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName) segments.lastEntry() match { case null => case entry => entry.getValue.index.trimToValidSize() @@ -468,33 +461,43 @@ class Log(val dir: File, val prev = addSegment(segment) if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) + + // schedule an asynchronous flush of the old segment + scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) + + info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0))) + segment } } - + /** - * Flush the log if necessary - * @param numberOfMessages The number of messages that are being appended + * The number of messages appended to the log since the last flush */ - private def maybeFlush(numberOfMessages : Int) { - if(unflushed.addAndGet(numberOfMessages) >= config.flushInterval) - flush() - } + def unflushedMessages() = this.logEndOffset - this.recoveryPoint + + /** + * Flush all log segments + */ + def flush(): Unit = flush(this.logEndOffset) /** - * Flush this log file and associated index to the physical disk + * Flush log segments for all offsets up to offset-1 + * @param offset The offset to flush up to (non-inclusive); the new recovery point */ - def flush() : Unit = { - if (unflushed.get == 0) + def flush(offset: Long) : Unit = { + if (offset <= this.recoveryPoint) return - - debug("Flushing log '" + name + "' last flushed: " + lastFlushTime + " current time: " + - time.milliseconds + " unflushed = " + unflushed.get) + debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + + time.milliseconds + " unflushed = " + unflushedMessages) + for(segment <- logSegments(this.recoveryPoint, offset)) + segment.flush() lock synchronized { - activeSegment.flush() - unflushed.set(0) - lastflushedTime.set(time.milliseconds) - } + if(offset > this.recoveryPoint) { + this.recoveryPoint = offset + lastflushedTime.set(time.milliseconds) + } + } } /** @@ -525,6 +528,7 @@ class Log(val dir: File, deletable.foreach(deleteSegment(_)) activeSegment.truncateTo(targetOffset) this.nextOffset.set(targetOffset) + this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) } truncates.getAndIncrement } @@ -544,6 +548,7 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize)) this.nextOffset.set(newOffset) + this.recoveryPoint = math.min(newOffset, this.recoveryPoint) truncates.getAndIncrement } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 0d567e4..8ac635e 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,6 +23,7 @@ import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} import kafka.server.KafkaConfig +import kafka.server.OffsetCheckpoint /** @@ -41,11 +42,12 @@ class LogManager(val logDirs: Array[File], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, val flushCheckMs: Long, + val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler, private val time: Time) extends Logging { - val CleanShutdownFile = ".kafka_cleanshutdown" + val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 private val logCreationLock = new Object @@ -53,6 +55,7 @@ class LogManager(val logDirs: Array[File], createAndValidateLogDirs(logDirs) private var dirLocks = lockLogDirs(logDirs) + private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap loadLogs(logDirs) private val cleaner: LogCleaner = @@ -102,10 +105,7 @@ class LogManager(val logDirs: Array[File], */ private def loadLogs(dirs: Seq[File]) { for(dir <- dirs) { - /* check if this set of logs was shut down cleanly */ - val cleanShutDownFile = new File(dir, CleanShutdownFile) - val needsRecovery = !cleanShutDownFile.exists - cleanShutDownFile.delete + val recoveryPoints = this.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { @@ -116,7 +116,7 @@ class LogManager(val logDirs: Array[File], val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, - needsRecovery, + recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = this.logs.put(topicPartition, log) @@ -146,6 +146,11 @@ class LogManager(val logDirs: Array[File], delay = InitialTaskDelayMs, period = flushCheckMs, TimeUnit.MILLISECONDS) + scheduler.schedule("kafka-recovery-point-checkpoint", + checkpointRecoveryPointOffsets, + delay = InitialTaskDelayMs, + period = flushCheckpointMs, + TimeUnit.MILLISECONDS) } if(cleanerConfig.enableCleaner) cleaner.startup() @@ -162,8 +167,8 @@ class LogManager(val logDirs: Array[File], Utils.swallow(cleaner.shutdown()) // close the logs allLogs.foreach(_.close()) - // mark that the shutdown was clean by creating the clean shutdown marker file - logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile())) + // update the last flush point + checkpointRecoveryPointOffsets() } finally { // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) @@ -172,6 +177,19 @@ class LogManager(val logDirs: Array[File], } /** + * Write out the current recovery point for all logs to a text file in the log directory + * to avoid recovering the whole log on startup. + */ + def checkpointRecoveryPointOffsets() { + val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString) + for(dir <- logDirs) { + val recoveryPoints = recoveryPointsByDir.get(dir.toString) + if(recoveryPoints.isDefined) + this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) + } + } + + /** * Get the log if it exists, otherwise return None */ def getLog(topicAndPartition: TopicAndPartition): Option[Log] = { @@ -200,7 +218,7 @@ class LogManager(val logDirs: Array[File], dir.mkdirs() log = new Log(dir, config, - needsRecovery = false, + recoveryPoint = 0L, scheduler, time) logs.put(topicAndPartition, log) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 30d2e91..5162573 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -135,9 +135,11 @@ class LogSegment(val log: FileMessageSet, * * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this * is corrupt. + * + * @return The number of message truncated from the log */ @nonthreadsafe - def recover(maxMessageSize: Int) { + def recover(maxMessageSize: Int): Int = { index.truncate() var validBytes = 0 var lastIndexEntry = 0 @@ -157,9 +159,8 @@ class LogSegment(val log: FileMessageSet, logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes - if(truncated > 0) - warn("Truncated " + truncated + " invalid bytes from the log segment %s.".format(log.file.getAbsolutePath)) log.truncateTo(validBytes) + truncated } override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")" diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 96cbd62..3fe0ed0 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -145,19 +145,19 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ - val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue)) + val logFlushIntervalMessages = props.getLongInRange("log.flush.interval.messages", Long.MaxValue, (1, Long.MaxValue)) /* the amount of time to wait before deleting a file from the filesystem */ val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue)) - /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ - val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt) - /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ - val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms", 3000) + val logFlushSchedulerIntervalMs = props.getLong("log.flush.scheduler.interval.ms", Long.MaxValue) /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ - val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs) + val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs) + + /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */ + val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) /* enable auto creation of topic on the server */ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e2f4e91..a3b7c08 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -183,6 +183,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, flushCheckMs = config.logFlushSchedulerIntervalMs, + flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, scheduler = kafkaScheduler, time = time) diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index c6e7a57..8fcd068 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -37,7 +37,7 @@ object StressTestLog { config = LogConfig(segmentSize = 64*1024*1024, maxMessageSize = Int.MaxValue, maxIndexSize = 1024*1024), - needsRecovery = false, + recoveryPoint = 0L, scheduler = time.scheduler, time = time) val writer = new WriterThread(log) diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 36d52e7..c317eaa 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -20,9 +20,16 @@ package kafka import java.io._ import java.nio._ import java.nio.channels._ +import java.util.Random +import kafka.log._ +import kafka.utils._ +import kafka.message._ import scala.math._ import joptsimple._ +/** + * This test does linear writes using either a kafka log or a file and measures throughput and latency. + */ object TestLinearWriteSpeed { def main(args: Array[String]): Unit = { @@ -32,7 +39,7 @@ object TestLinearWriteSpeed { .describedAs("path") .ofType(classOf[java.lang.String]) .defaultsTo(System.getProperty("java.io.tmpdir")) - val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.") + val bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.") .withRequiredArg .describedAs("num_bytes") .ofType(classOf[java.lang.Long]) @@ -40,7 +47,12 @@ object TestLinearWriteSpeed { .withRequiredArg .describedAs("num_bytes") .ofType(classOf[java.lang.Integer]) - val filesOpt = parser.accepts("files", "REQUIRED: The number of files.") + val messageSizeOpt = parser.accepts("message-size", "REQUIRED: The size of each message in the message set.") + .withRequiredArg + .describedAs("num_bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024) + val filesOpt = parser.accepts("files", "REQUIRED: The number of logs or files.") .withRequiredArg .describedAs("num_files") .ofType(classOf[java.lang.Integer]) @@ -55,7 +67,9 @@ object TestLinearWriteSpeed { .describedAs("mb") .ofType(classOf[java.lang.Integer]) .defaultsTo(Integer.MAX_VALUE) - val mmapOpt = parser.accepts("mmap", "Mmap file.") + val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.") + val channelOpt = parser.accepts("channel", "Do writes to file channesl.") + val logOpt = parser.accepts("log", "Do writes to kafka logs.") val options = parser.parse(args : _*) @@ -68,26 +82,34 @@ object TestLinearWriteSpeed { } var bytesToWrite = options.valueOf(bytesOpt).longValue - val mmap = options.has(mmapOpt) val bufferSize = options.valueOf(sizeOpt).intValue val numFiles = options.valueOf(filesOpt).intValue val reportingInterval = options.valueOf(reportingIntervalOpt).longValue val dir = options.valueOf(dirOpt) val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 1024L * 1024L val buffer = ByteBuffer.allocate(bufferSize) + val messageSize = options.valueOf(messageSizeOpt).intValue while(buffer.hasRemaining) buffer.put(123.asInstanceOf[Byte]) + val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead) + val messageSet = new ByteBufferMessageSet(messages = (0 until numMessages).map(x => new Message(new Array[Byte](messageSize))): _*) val writables = new Array[Writable](numFiles) + val scheduler = new KafkaScheduler(1) + scheduler.startup() + val rand = new Random for(i <- 0 until numFiles) { - val file = new File(dir, "kafka-test-" + i + ".dat") - file.deleteOnExit() - val raf = new RandomAccessFile(file, "rw") - raf.setLength(bytesToWrite / numFiles) - if(mmap) - writables(i) = new MmapWritable(raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length())) - else - writables(i) = new ChannelWritable(raf.getChannel()) + if(options.has(mmapOpt)) { + writables(i) = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer) + } else if(options.has(channelOpt)) { + writables(i) = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer) + } else if(options.has(logOpt)) { + val segmentSize = rand.nextInt(512)*1024*1024 + 64*1024*1024 // vary size to avoid herd effect + writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(segmentSize=segmentSize), scheduler, messageSet) + } else { + System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") + System.exit(1) + } } bytesToWrite = (bytesToWrite / numFiles) * numFiles @@ -101,15 +123,14 @@ object TestLinearWriteSpeed { var totalWritten = 0L var lastReport = beginTest while(totalWritten + bufferSize < bytesToWrite) { - buffer.rewind() val start = System.nanoTime - writables((count % numFiles).toInt.abs).write(buffer) + val writeSize = writables((count % numFiles).toInt.abs).write() val ellapsed = System.nanoTime - start maxLatency = max(ellapsed, maxLatency) totalLatency += ellapsed - written += bufferSize + written += writeSize count += 1 - totalWritten += bufferSize + totalWritten += writeSize if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) { val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0) val mb = written / (1024.0*1024.0) @@ -118,7 +139,7 @@ object TestLinearWriteSpeed { written = 0 maxLatency = 0L totalLatency = 0L - } else if(written > maxThroughputBytes) { + } else if(written > maxThroughputBytes * (reportingInterval / 1000.0)) { // if we have written enough, just sit out this reporting interval val lastReportMs = lastReport / (1000*1000) val now = System.nanoTime / (1000*1000) @@ -129,21 +150,53 @@ object TestLinearWriteSpeed { } val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0) println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec") + scheduler.shutdown() } trait Writable { - def write(buffer: ByteBuffer) + def write(): Int + def close() } - class MmapWritable(val buffer: ByteBuffer) extends Writable { - def write(b: ByteBuffer) { - buffer.put(b) + class MmapWritable(val file: File, size: Long, val content: ByteBuffer) extends Writable { + file.deleteOnExit() + val raf = new RandomAccessFile(file, "rw") + raf.setLength(size) + val buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length()) + def write(): Int = { + buffer.put(content) + content.rewind() + content.limit + } + def close() { + raf.close() + } + } + + class ChannelWritable(val file: File, val content: ByteBuffer) extends Writable { + file.deleteOnExit() + val raf = new RandomAccessFile(file, "rw") + val channel = raf.getChannel + def write(): Int = { + channel.write(content) + content.rewind() + content.limit + } + def close() { + raf.close() } } - class ChannelWritable(val channel: FileChannel) extends Writable { - def write(b: ByteBuffer) { - channel.write(b) + class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable { + Utils.rm(dir) + val log = new Log(dir, config, 0L, scheduler, SystemTime) + def write(): Int = { + log.append(messages, true) + messages.sizeInBytes + } + def close() { + log.close() + Utils.rm(log.dir) } } diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala deleted file mode 100644 index d91011e..0000000 --- a/core/src/test/scala/other/kafka/TestLogPerformance.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import kafka.message._ -import kafka.utils.{SystemTime, TestUtils, Utils, KafkaScheduler} -import kafka.server.KafkaConfig - -object TestLogPerformance { - - def main(args: Array[String]): Unit = { - if(args.length < 4) - Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size batch_size compression_codec") - val numMessages = args(0).toInt - val messageSize = args(1).toInt - val batchSize = args(2).toInt - val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt) - val props = TestUtils.createBrokerConfig(0, -1) - val config = new KafkaConfig(props) - val dir = TestUtils.tempDir() - val scheduler = new KafkaScheduler(1) - val logConfig = LogConfig() - val log = new Log(dir, logConfig, needsRecovery = false, scheduler = scheduler, time = SystemTime) - val bytes = new Array[Byte](messageSize) - new java.util.Random().nextBytes(bytes) - val message = new Message(bytes) - val messages = new Array[Message](batchSize) - for(i <- 0 until batchSize) - messages(i) = message - val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages: _*) - val numBatches = numMessages / batchSize - val start = System.currentTimeMillis() - for(i <- 0 until numBatches) - log.append(messageSet) - log.close() - val elapsed = (System.currentTimeMillis() - start) / 1000.0 - val writtenBytes = MessageSet.entrySize(message) * numMessages - println("message size = " + MessageSet.entrySize(message)) - println("MB/sec: " + writtenBytes / elapsed / (1024.0 * 1024.0)) - Utils.rm(dir) - } - -} diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 4619d86..5a312bf 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -195,7 +195,7 @@ class CleanerTest extends JUnitSuite { } def makeLog(dir: File = dir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time) + new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) def makeCleaner(capacity: Int) = new Cleaner(id = 0, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 15e9b60..1de3ef0 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -102,7 +102,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { dir.mkdirs() val log = new Log(dir = dir, LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true), - needsRecovery = false, + recoveryPoint = 0L, scheduler = time.scheduler, time = time) logs.put(TopicAndPartition("log", i), log) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 6916df4..b4bee33 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -24,6 +24,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.server.KafkaConfig import kafka.common._ import kafka.utils._ +import kafka.server.OffsetCheckpoint class LogManagerTest extends JUnit3Suite { @@ -40,7 +41,15 @@ class LogManagerTest extends JUnit3Suite { override def setUp() { super.setUp() logDir = TestUtils.tempDir() - logManager = new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time) + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) logManager.startup logDir = logManager.logDirs(0) } @@ -116,7 +125,7 @@ class LogManagerTest extends JUnit3Suite { logManager.shutdown() val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L) - logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time) + logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time) logManager.startup // create a log @@ -156,7 +165,7 @@ class LogManagerTest extends JUnit3Suite { def testTimeBasedFlush() { logManager.shutdown() val config = logConfig.copy(flushMs = 1000) - logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time) + logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) logManager.startup val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime @@ -178,7 +187,7 @@ class LogManagerTest extends JUnit3Suite { TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() - logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time) + logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { @@ -194,10 +203,30 @@ class LogManagerTest extends JUnit3Suite { */ def testTwoLogManagersUsingSameDirFails() { try { - new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time) + new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) fail("Should not be able to create a second log manager instance with the same data directory") } catch { case e: KafkaException => // this is good } } + + /** + * Test that recovery points are correctly written out to disk + */ + def testCheckpointRecoveryPoints() { + val topicA = TopicAndPartition("test-a", 1) + val topicB = TopicAndPartition("test-b", 1) + val logA = this.logManager.createLog(topicA, logConfig) + val logB = this.logManager.createLog(topicB, logConfig) + for(i <- 0 until 50) + logA.append(TestUtils.singleMessageSet("test".getBytes())) + for(i <- 0 until 100) + logB.append(TestUtils.singleMessageSet("test".getBytes())) + logA.flush() + logB.flush() + logManager.checkpointRecoveryPointOffsets() + val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() + assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) + assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint) + } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 7d41938..d0f823a 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -67,7 +67,7 @@ class LogTest extends JUnitSuite { // create a log val log = new Log(logDir, logConfig.copy(segmentMs = 1 * 60 * 60L), - needsRecovery = false, + recoveryPoint = 0L, scheduler = time.scheduler, time = time) time.sleep(log.config.segmentMs + 1) @@ -102,7 +102,7 @@ class LogTest extends JUnitSuite { val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -118,7 +118,7 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = new Log(logDir, logConfig, needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time) log.append(TestUtils.singleMessageSet("test".getBytes)) } @@ -127,7 +127,7 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithSequentialOffsets() { - val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray for(i <- 0 until messages.length) @@ -146,7 +146,7 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithNonSequentialOffsets() { - val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) @@ -169,7 +169,7 @@ class LogTest extends JUnitSuite { */ @Test def testReadAtLogGap() { - val log = new Log(logDir, logConfig.copy(segmentSize = 300), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -189,7 +189,7 @@ class LogTest extends JUnitSuite { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, logConfig.copy(segmentSize = 1024), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes) try { log.read(0, 1024) @@ -212,7 +212,7 @@ class LogTest extends JUnitSuite { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) messageSets.foreach(log.append(_)) @@ -228,6 +228,11 @@ class LogTest extends JUnitSuite { } val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) assertEquals("Should be no more messages", 0, lastRead.size) + + // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure + TestUtils.retry(1000L){ + assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) + } } /** @@ -236,7 +241,7 @@ class LogTest extends JUnitSuite { @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val log = new Log(logDir, logConfig.copy(segmentSize = 10), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) @@ -259,7 +264,7 @@ class LogTest extends JUnitSuite { for(messagesToAppend <- List(0, 1, 25)) { logDir.mkdirs() // first test a log segment starting at 0 - val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) for(i <- 0 until messagesToAppend) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) @@ -293,7 +298,7 @@ class LogTest extends JUnitSuite { // append messages to log val maxMessageSize = second.sizeInBytes - 1 - val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), recoveryPoint = 0L, time.scheduler, time = time) // should be able to append the small message log.append(first) @@ -316,22 +321,23 @@ class LogTest extends JUnitSuite { val segmentSize = 7 * messageSize val indexInterval = 3 * messageSize val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval, maxIndexSize = 4096) - var log = new Log(logDir, config, needsRecovery = false, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) val lastIndexOffset = log.activeSegment.index.lastOffset val numIndexEntries = log.activeSegment.index.entries + val lastOffset = log.logEndOffset log.close() - log = new Log(logDir, config, needsRecovery = false, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() // test recovery case - log = new Log(logDir, config, needsRecovery = true, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) @@ -346,7 +352,7 @@ class LogTest extends JUnitSuite { // publish the messages and close the log val numMessages = 200 val config = logConfig.copy(segmentSize = 200, indexInterval = 1) - var log = new Log(logDir, config, needsRecovery = true, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) val indexFiles = log.logSegments.map(_.index.file) @@ -356,7 +362,7 @@ class LogTest extends JUnitSuite { indexFiles.foreach(_.delete()) // reopen the log - log = new Log(logDir, config, needsRecovery = true, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) assertEquals(i, log.read(i, 100, None).head.offset) @@ -374,7 +380,7 @@ class LogTest extends JUnitSuite { val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, scheduler = time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -427,7 +433,7 @@ class LogTest extends JUnitSuite { val msgPerSeg = 10 val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages val config = logConfig.copy(segmentSize = segmentSize) - val log = new Log(logDir, config, needsRecovery = false, scheduler = time.scheduler, time = time) + val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) log.append(set) @@ -457,7 +463,7 @@ class LogTest extends JUnitSuite { logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000, indexInterval = 1), - needsRecovery = false, + recoveryPoint = 0L, time.scheduler, time) @@ -484,7 +490,7 @@ class LogTest extends JUnitSuite { // create a log var log = new Log(logDir, config, - needsRecovery = true, + recoveryPoint = 0L, time.scheduler, time) @@ -494,7 +500,7 @@ class LogTest extends JUnitSuite { log.close() log = new Log(logDir, config, - needsRecovery = true, + recoveryPoint = 0L, time.scheduler, time) log.truncateTo(3) @@ -515,7 +521,7 @@ class LogTest extends JUnitSuite { indexInterval = 10000) val log = new Log(logDir, config, - needsRecovery = true, + recoveryPoint = 0L, time.scheduler, time) @@ -550,7 +556,7 @@ class LogTest extends JUnitSuite { val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000) var log = new Log(logDir, config, - needsRecovery = false, + recoveryPoint = 0L, time.scheduler, time) @@ -563,7 +569,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, - needsRecovery = false, + recoveryPoint = 0L, time.scheduler, time) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) @@ -573,7 +579,7 @@ class LogTest extends JUnitSuite { def testAppendMessageWithNullPayload() { var log = new Log(logDir, LogConfig(), - needsRecovery = false, + recoveryPoint = 0L, time.scheduler, time) log.append(new ByteBufferMessageSet(new Message(bytes = null))) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 9963502..72348b8 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -36,6 +36,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { defaultConfig = LogConfig(), cleanerConfig = CleanerConfig(), flushCheckMs = 30000, + flushCheckpointMs = 10000L, retentionCheckMs = 30000, scheduler = new KafkaScheduler(1), time = new MockTime)) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index c5f39cb..3deafaf 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -53,11 +53,12 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { // send some messages producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) - // do a clean shutdown and check that the clean shudown file is written out + // do a clean shutdown and check that offset checkpoint file exists server.shutdown() for(logDir <- config.logDirs) { - val cleanShutDownFile = new File(logDir, server.logManager.CleanShutdownFile) - assertTrue(cleanShutDownFile.exists) + val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile) + assertTrue(OffsetCheckpointFile.exists) + assertTrue(OffsetCheckpointFile.length() > 0) } producer.close() -- 1.7.10.2 (Apple Git-33)