From 6666709d20520a0b7342b8163d883d08699f54e4 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Wed, 17 Jul 2013 14:45:03 -0700 Subject: [PATCH] KAFKA-615 fsync asynchronous from log roll. --- config/server.properties | 18 +-- core/src/main/scala/kafka/log/FileMessageSet.scala | 4 +- core/src/main/scala/kafka/log/Log.scala | 134 +++++++++++---------- core/src/main/scala/kafka/log/LogManager.scala | 38 ++++-- core/src/main/scala/kafka/log/LogSegment.scala | 11 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 1 - core/src/main/scala/kafka/server/KafkaConfig.scala | 14 +-- core/src/main/scala/kafka/server/KafkaServer.scala | 1 + .../main/scala/kafka/server/ReplicaManager.scala | 1 + .../src/test/scala/other/kafka/StressTestLog.scala | 2 +- .../scala/other/kafka/TestLinearWriteSpeed.scala | 116 ++++++++++++++---- .../scala/other/kafka/TestLogPerformance.scala | 58 --------- .../test/scala/unit/kafka/log/CleanerTest.scala | 2 +- .../unit/kafka/log/LogCleanerIntegrationTest.scala | 2 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 39 +++++- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 13 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 88 ++++++++++---- .../server/HighwatermarkPersistenceTest.scala | 1 + .../unit/kafka/server/ServerShutdownTest.scala | 7 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 9 ++ 20 files changed, 330 insertions(+), 229 deletions(-) delete mode 100644 core/src/test/scala/other/kafka/TestLogPerformance.scala diff --git a/config/server.properties b/config/server.properties index 01e0b12..7685879 100644 --- a/config/server.properties +++ b/config/server.properties @@ -56,23 +56,15 @@ num.partitions=2 ############################# Log Flush Policy ############################# -# The following configurations control the flush of data to disk. This is the most -# important performance knob in kafka. -# There are a few important trade-offs here: -# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. -# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). -# 3. Throughput: The flush is generally the most expensive operation. -# The settings below allow one to configure the flush policy to flush data after a period of time or -# every N messages (or both). This can be done globally and overridden on a per-topic basis. +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The below configuration can enforce a more aggressive application level +# fsync policy. This will have a significant performance impact. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval.messages=10000 +#log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush -log.flush.interval.ms=1000 - -# Per-topic overrides for log.flush.interval.ms -#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 +#log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 1afb533..2479a5f 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -52,11 +52,9 @@ class FileMessageSet private[kafka](@volatile var file: File, new AtomicInteger(math.min(channel.size().toInt, end) - start) /* if this is not a slice, update the file pointer to the end of the file */ - if (!isSlice) { - debug("Creating or reloading log segment %s".format(file.getAbsolutePath)) + if (!isSlice) /* set the file position to the last byte in the file */ channel.position(channel.size) - } /** * Create a file message set with no slicing. diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 87151b9..3cac7e3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -38,19 +38,17 @@ import com.yammer.metrics.core.Gauge * for a given segment. * * @param dir The directory in which log segments are created. - * @param maxSegmentSize The maximum segment size in bytes. - * @param maxMessageSize The maximum message size in bytes (including headers) that will be allowed in this log. - * @param flushInterval The number of messages that can be appended to this log before we force a flush of the log. - * @param rollIntervalMs The time after which we will force the rolling of a new log segment - * @param needsRecovery Should we run recovery on this log when opening it? This should be done if the log wasn't cleanly shut down. - * @param maxIndexSize The maximum size of an offset index in this log. The index of the active log segment will be pre-allocated to this size. - * @param indexIntervalBytes The (approximate) number of bytes between entries in the offset index for this log. + * @param config The log configuration settings + * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk + * @param scheduler The thread pool scheduler used for background actions + * @param recoveryPoint The position from which to begin running recovery (i.e. the last flush point in the log) + * @param time The time instance used for checking the clock * */ @threadsafe class Log(val dir: File, @volatile var config: LogConfig, - val needsRecovery: Boolean, + @volatile var recoveryPoint: Long = 0L, val scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { @@ -59,14 +57,12 @@ class Log(val dir: File, /* A lock that guards all modifications to the log */ private val lock = new Object - /* The current number of unflushed messages appended to the write */ - private val unflushed = new AtomicInteger(0) - /* last time it was flushed */ private val lastflushedTime = new AtomicLong(time.milliseconds) /* the actual segments of the log */ - private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments() + private val segments: ConcurrentNavigableMap[Long,LogSegment] = new ConcurrentSkipListMap[Long, LogSegment] + loadSegments() /* The number of times the log has been truncated */ private val truncates = new AtomicInteger(0) @@ -86,10 +82,7 @@ class Log(val dir: File, def name = dir.getName() /* Load the log segments from the log files on disk */ - private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = { - // open all the segments read-only - val logSegments = new ConcurrentSkipListMap[Long, LogSegment] - + private def loadSegments() { // create the log directory if it doesn't exist dir.mkdirs() @@ -145,46 +138,57 @@ class Log(val dir: File, error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) } - logSegments.put(start, segment) + segments.put(start, segment) } } if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 - logSegments.put(0, - new LogSegment(dir = dir, + segments.put(0, new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize)) } else { + recoverLog() // reset the index size of the currently active log segment to allow more entries - val active = logSegments.lastEntry.getValue - active.index.resize(config.maxIndexSize) + activeSegment.index.resize(config.maxIndexSize) + } - // run recovery on the active segment if necessary - if(needsRecovery) { + // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset. + for (s <- logSegments) { + require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, + "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" + .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) + } + } + + private def recoverLog() { + val lastOffset = try {activeSegment.nextOffset} catch {case _ => -1L} + if(lastOffset == this.recoveryPoint) { + info("Log '%s' is fully intact, skipping recovery".format(name)) + return + } + val unflushed = logSegments(segments.floorKey(this.recoveryPoint), Long.MaxValue).iterator + while(unflushed.hasNext) { + val curr = unflushed.next + info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name)) + val truncatedBytes = try { - info("Recovering active segment of %s.".format(name)) - active.recover(config.maxMessageSize) + curr.recover(config.maxMessageSize) } catch { - case e: InvalidOffsetException => - val startOffset = active.baseOffset + case e: InvalidOffsetException => + val startOffset = curr.baseOffset warn("Found invalid offset during recovery of the active segment for topic partition " + dir.getName +". Deleting the segment and " + "creating an empty one with starting offset " + startOffset) // truncate the active segment to its starting offset - active.truncateTo(startOffset) + curr.truncateTo(startOffset) } + if(truncatedBytes > 0) { + // we had an invalid message, delete all remaining log + warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset)) + unflushed.foreach(deleteSegment) } } - - // Check for the index file of every segment, if it's empty or its last offset is greater than its base offset. - for (s <- asIterable(logSegments.values)) { - require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, - "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" - .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) - } - - logSegments } /** @@ -272,7 +276,8 @@ class Log(val dir: File, trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages)) - maybeFlush(appendInfo.shallowCount) + if(unflushedMessages >= config.flushInterval) + flush() appendInfo } @@ -285,7 +290,6 @@ class Log(val dir: File, * @param firstOffset The first offset in the message set * @param lastOffset The last offset in the message set * @param codec The codec used in the message set - * @param count The number of messages * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean) @@ -452,11 +456,8 @@ class Log(val dir: File, * @return The newly rolled segment */ def roll(): LogSegment = { + val start = time.nanoseconds lock synchronized { - // flush the log to ensure that only the active segment needs to be recovered - if(!segments.isEmpty()) - flush() - val newOffset = logEndOffset val logFile = logFilename(dir, newOffset) val indexFile = indexFilename(dir, newOffset) @@ -465,7 +466,6 @@ class Log(val dir: File, file.delete() } - info("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName) segments.lastEntry() match { case null => case entry => entry.getValue.index.trimToValidSize() @@ -477,33 +477,43 @@ class Log(val dir: File, val prev = addSegment(segment) if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) + + // schedule an asynchronous flush of the old segment + scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) + + info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0))) + segment } } - + /** - * Flush the log if necessary - * @param numberOfMessages The number of messages that are being appended + * The number of messages appended to the log since the last flush */ - private def maybeFlush(numberOfMessages : Int) { - if(unflushed.addAndGet(numberOfMessages) >= config.flushInterval) - flush() - } + def unflushedMessages() = this.logEndOffset - this.recoveryPoint + + /** + * Flush all log segments + */ + def flush(): Unit = flush(this.logEndOffset) /** - * Flush this log file and associated index to the physical disk + * Flush log segments for all offsets up to offset-1 + * @param offset The offset to flush up to (non-inclusive); the new recovery point */ - def flush() : Unit = { - if (unflushed.get == 0) + def flush(offset: Long) : Unit = { + if (offset <= this.recoveryPoint) return - - debug("Flushing log '" + name + "' last flushed: " + lastFlushTime + " current time: " + - time.milliseconds + " unflushed = " + unflushed.get) + debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + + time.milliseconds + " unflushed = " + unflushedMessages) + for(segment <- logSegments(this.recoveryPoint, offset)) + segment.flush() lock synchronized { - activeSegment.flush() - unflushed.set(0) - lastflushedTime.set(time.milliseconds) - } + if(offset > this.recoveryPoint) { + this.recoveryPoint = offset + lastflushedTime.set(time.milliseconds) + } + } } /** @@ -534,6 +544,7 @@ class Log(val dir: File, deletable.foreach(deleteSegment(_)) activeSegment.truncateTo(targetOffset) this.nextOffset.set(targetOffset) + this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) } truncates.getAndIncrement } @@ -553,6 +564,7 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize)) this.nextOffset.set(newOffset) + this.recoveryPoint = math.min(newOffset, this.recoveryPoint) truncates.getAndIncrement } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 9002483..d039f9d 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,6 +23,7 @@ import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} import kafka.server.KafkaConfig +import kafka.server.OffsetCheckpoint /** @@ -41,11 +42,12 @@ class LogManager(val logDirs: Array[File], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, val flushCheckMs: Long, + val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler, private val time: Time) extends Logging { - val CleanShutdownFile = ".kafka_cleanshutdown" + val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 private val logCreationLock = new Object @@ -53,6 +55,7 @@ class LogManager(val logDirs: Array[File], createAndValidateLogDirs(logDirs) private var dirLocks = lockLogDirs(logDirs) + private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap loadLogs(logDirs) private val cleaner: LogCleaner = @@ -102,10 +105,7 @@ class LogManager(val logDirs: Array[File], */ private def loadLogs(dirs: Seq[File]) { for(dir <- dirs) { - /* check if this set of logs was shut down cleanly */ - val cleanShutDownFile = new File(dir, CleanShutdownFile) - val needsRecovery = !cleanShutDownFile.exists - cleanShutDownFile.delete + val recoveryPoints = this.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { @@ -116,7 +116,7 @@ class LogManager(val logDirs: Array[File], val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, - needsRecovery, + recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = this.logs.put(topicPartition, log) @@ -146,6 +146,11 @@ class LogManager(val logDirs: Array[File], delay = InitialTaskDelayMs, period = flushCheckMs, TimeUnit.MILLISECONDS) + scheduler.schedule("kafka-recovery-point-checkpoint", + checkpointRecoveryPointOffsets, + delay = InitialTaskDelayMs, + period = flushCheckpointMs, + TimeUnit.MILLISECONDS) } if(cleanerConfig.enableCleaner) cleaner.startup() @@ -160,10 +165,12 @@ class LogManager(val logDirs: Array[File], // stop the cleaner first if(cleaner != null) Utils.swallow(cleaner.shutdown()) + // flush the logs to ensure latest possible recovery point + allLogs.foreach(_.flush()) // close the logs allLogs.foreach(_.close()) - // mark that the shutdown was clean by creating the clean shutdown marker file - logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile())) + // update the last flush point + checkpointRecoveryPointOffsets() } finally { // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) @@ -172,6 +179,19 @@ class LogManager(val logDirs: Array[File], } /** + * Write out the current recovery point for all logs to a text file in the log directory + * to avoid recovering the whole log on startup. + */ + def checkpointRecoveryPointOffsets() { + val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString) + for(dir <- logDirs) { + val recoveryPoints = recoveryPointsByDir.get(dir.toString) + if(recoveryPoints.isDefined) + this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) + } + } + + /** * Get the log if it exists, otherwise return None */ def getLog(topicAndPartition: TopicAndPartition): Option[Log] = { @@ -200,7 +220,7 @@ class LogManager(val logDirs: Array[File], dir.mkdirs() log = new Log(dir, config, - needsRecovery = false, + recoveryPoint = 0L, scheduler, time) logs.put(topicAndPartition, log) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 38e6cd5..361aa53 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -147,14 +147,17 @@ class LogSegment(val log: FileMessageSet, } /** - * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log. + * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index. * * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this * is corrupt. + * + * @return The number of message truncated from the log */ @nonthreadsafe - def recover(maxMessageSize: Int) { + def recover(maxMessageSize: Int): Int = { index.truncate() + index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) @@ -181,9 +184,9 @@ class LogSegment(val log: FileMessageSet, logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes - if(truncated > 0) - warn("Truncated " + truncated + " invalid bytes from the log segment %s.".format(log.file.getAbsolutePath)) log.truncateTo(validBytes) + index.trimToValidSize() + truncated } override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")" diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index fbc728c..afab848 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -268,7 +268,6 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ def resize(newSize: Int) { this synchronized { - flush() val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) try { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f7d8b03..ebbbdea 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -145,19 +145,19 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ - val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue)) + val logFlushIntervalMessages = props.getLongInRange("log.flush.interval.messages", Long.MaxValue, (1, Long.MaxValue)) /* the amount of time to wait before deleting a file from the filesystem */ val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue)) - /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ - val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt) - /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ - val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms", 3000) + val logFlushSchedulerIntervalMs = props.getLong("log.flush.scheduler.interval.ms", Long.MaxValue) /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ - val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs) + val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs) + + /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */ + val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) /* enable auto creation of topic on the server */ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) @@ -223,4 +223,4 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum size for a metadata entry associated with an offset commit */ val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", 1024) -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2176958..a925ae1 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -277,6 +277,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, flushCheckMs = config.logFlushSchedulerIntervalMs, + flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, scheduler = kafkaScheduler, time = time) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 73c87c6..539d0cf 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -236,6 +236,7 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicAndPartition._1, topicAndPartition._2)) + logManager.checkpointRecoveryPointOffsets() } info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index c6e7a57..8fcd068 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -37,7 +37,7 @@ object StressTestLog { config = LogConfig(segmentSize = 64*1024*1024, maxMessageSize = Int.MaxValue, maxIndexSize = 1024*1024), - needsRecovery = false, + recoveryPoint = 0L, scheduler = time.scheduler, time = time) val writer = new WriterThread(log) diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 36d52e7..eeb8c88 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -20,9 +20,16 @@ package kafka import java.io._ import java.nio._ import java.nio.channels._ +import java.util.Random +import kafka.log._ +import kafka.utils._ +import kafka.message._ import scala.math._ import joptsimple._ +/** + * This test does linear writes using either a kafka log or a file and measures throughput and latency. + */ object TestLinearWriteSpeed { def main(args: Array[String]): Unit = { @@ -32,7 +39,7 @@ object TestLinearWriteSpeed { .describedAs("path") .ofType(classOf[java.lang.String]) .defaultsTo(System.getProperty("java.io.tmpdir")) - val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.") + val bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.") .withRequiredArg .describedAs("num_bytes") .ofType(classOf[java.lang.Long]) @@ -40,7 +47,12 @@ object TestLinearWriteSpeed { .withRequiredArg .describedAs("num_bytes") .ofType(classOf[java.lang.Integer]) - val filesOpt = parser.accepts("files", "REQUIRED: The number of files.") + val messageSizeOpt = parser.accepts("message-size", "REQUIRED: The size of each message in the message set.") + .withRequiredArg + .describedAs("num_bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024) + val filesOpt = parser.accepts("files", "REQUIRED: The number of logs or files.") .withRequiredArg .describedAs("num_files") .ofType(classOf[java.lang.Integer]) @@ -55,7 +67,19 @@ object TestLinearWriteSpeed { .describedAs("mb") .ofType(classOf[java.lang.Integer]) .defaultsTo(Integer.MAX_VALUE) - val mmapOpt = parser.accepts("mmap", "Mmap file.") + val flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes") + .withRequiredArg() + .describedAs("message_count") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Long.MaxValue) + val compressionCodecOpt = parser.accepts("compression", "The compression codec to use") + .withRequiredArg + .describedAs("codec") + .ofType(classOf[java.lang.String]) + .defaultsTo(NoCompressionCodec.name) + val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.") + val channelOpt = parser.accepts("channel", "Do writes to file channesl.") + val logOpt = parser.accepts("log", "Do writes to kafka logs.") val options = parser.parse(args : _*) @@ -68,26 +92,35 @@ object TestLinearWriteSpeed { } var bytesToWrite = options.valueOf(bytesOpt).longValue - val mmap = options.has(mmapOpt) val bufferSize = options.valueOf(sizeOpt).intValue val numFiles = options.valueOf(filesOpt).intValue val reportingInterval = options.valueOf(reportingIntervalOpt).longValue val dir = options.valueOf(dirOpt) val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 1024L * 1024L val buffer = ByteBuffer.allocate(bufferSize) - while(buffer.hasRemaining) - buffer.put(123.asInstanceOf[Byte]) + val messageSize = options.valueOf(messageSizeOpt).intValue + val flushInterval = options.valueOf(flushIntervalOpt).longValue + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt)) + val rand = new Random + rand.nextBytes(buffer.array) + val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead) + val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = (0 until numMessages).map(x => new Message(new Array[Byte](messageSize))): _*) val writables = new Array[Writable](numFiles) + val scheduler = new KafkaScheduler(1) + scheduler.startup() for(i <- 0 until numFiles) { - val file = new File(dir, "kafka-test-" + i + ".dat") - file.deleteOnExit() - val raf = new RandomAccessFile(file, "rw") - raf.setLength(bytesToWrite / numFiles) - if(mmap) - writables(i) = new MmapWritable(raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length())) - else - writables(i) = new ChannelWritable(raf.getChannel()) + if(options.has(mmapOpt)) { + writables(i) = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer) + } else if(options.has(channelOpt)) { + writables(i) = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer) + } else if(options.has(logOpt)) { + val segmentSize = rand.nextInt(512)*1024*1024 + 64*1024*1024 // vary size to avoid herd effect + writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(segmentSize=segmentSize, flushInterval = flushInterval), scheduler, messageSet) + } else { + System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") + System.exit(1) + } } bytesToWrite = (bytesToWrite / numFiles) * numFiles @@ -101,15 +134,14 @@ object TestLinearWriteSpeed { var totalWritten = 0L var lastReport = beginTest while(totalWritten + bufferSize < bytesToWrite) { - buffer.rewind() val start = System.nanoTime - writables((count % numFiles).toInt.abs).write(buffer) + val writeSize = writables((count % numFiles).toInt.abs).write() val ellapsed = System.nanoTime - start maxLatency = max(ellapsed, maxLatency) totalLatency += ellapsed - written += bufferSize + written += writeSize count += 1 - totalWritten += bufferSize + totalWritten += writeSize if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) { val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0) val mb = written / (1024.0*1024.0) @@ -118,7 +150,7 @@ object TestLinearWriteSpeed { written = 0 maxLatency = 0L totalLatency = 0L - } else if(written > maxThroughputBytes) { + } else if(written > maxThroughputBytes * (reportingInterval / 1000.0)) { // if we have written enough, just sit out this reporting interval val lastReportMs = lastReport / (1000*1000) val now = System.nanoTime / (1000*1000) @@ -129,21 +161,53 @@ object TestLinearWriteSpeed { } val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0) println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec") + scheduler.shutdown() } trait Writable { - def write(buffer: ByteBuffer) + def write(): Int + def close() } - class MmapWritable(val buffer: ByteBuffer) extends Writable { - def write(b: ByteBuffer) { - buffer.put(b) + class MmapWritable(val file: File, size: Long, val content: ByteBuffer) extends Writable { + file.deleteOnExit() + val raf = new RandomAccessFile(file, "rw") + raf.setLength(size) + val buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length()) + def write(): Int = { + buffer.put(content) + content.rewind() + content.limit + } + def close() { + raf.close() } } - class ChannelWritable(val channel: FileChannel) extends Writable { - def write(b: ByteBuffer) { - channel.write(b) + class ChannelWritable(val file: File, val content: ByteBuffer) extends Writable { + file.deleteOnExit() + val raf = new RandomAccessFile(file, "rw") + val channel = raf.getChannel + def write(): Int = { + channel.write(content) + content.rewind() + content.limit + } + def close() { + raf.close() + } + } + + class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable { + Utils.rm(dir) + val log = new Log(dir, config, 0L, scheduler, SystemTime) + def write(): Int = { + log.append(messages, true) + messages.sizeInBytes + } + def close() { + log.close() + Utils.rm(log.dir) } } diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala deleted file mode 100644 index d91011e..0000000 --- a/core/src/test/scala/other/kafka/TestLogPerformance.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import kafka.message._ -import kafka.utils.{SystemTime, TestUtils, Utils, KafkaScheduler} -import kafka.server.KafkaConfig - -object TestLogPerformance { - - def main(args: Array[String]): Unit = { - if(args.length < 4) - Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size batch_size compression_codec") - val numMessages = args(0).toInt - val messageSize = args(1).toInt - val batchSize = args(2).toInt - val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt) - val props = TestUtils.createBrokerConfig(0, -1) - val config = new KafkaConfig(props) - val dir = TestUtils.tempDir() - val scheduler = new KafkaScheduler(1) - val logConfig = LogConfig() - val log = new Log(dir, logConfig, needsRecovery = false, scheduler = scheduler, time = SystemTime) - val bytes = new Array[Byte](messageSize) - new java.util.Random().nextBytes(bytes) - val message = new Message(bytes) - val messages = new Array[Message](batchSize) - for(i <- 0 until batchSize) - messages(i) = message - val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages: _*) - val numBatches = numMessages / batchSize - val start = System.currentTimeMillis() - for(i <- 0 until numBatches) - log.append(messageSet) - log.close() - val elapsed = (System.currentTimeMillis() - start) / 1000.0 - val writtenBytes = MessageSet.entrySize(message) * numMessages - println("message size = " + MessageSet.entrySize(message)) - println("MB/sec: " + writtenBytes / elapsed / (1024.0 * 1024.0)) - Utils.rm(dir) - } - -} diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 4619d86..5a312bf 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -195,7 +195,7 @@ class CleanerTest extends JUnitSuite { } def makeLog(dir: File = dir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time) + new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) def makeCleaner(capacity: Int) = new Cleaner(id = 0, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 15e9b60..1de3ef0 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -102,7 +102,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { dir.mkdirs() val log = new Log(dir = dir, LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true), - needsRecovery = false, + recoveryPoint = 0L, scheduler = time.scheduler, time = time) logs.put(TopicAndPartition("log", i), log) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 6916df4..b4bee33 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -24,6 +24,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.server.KafkaConfig import kafka.common._ import kafka.utils._ +import kafka.server.OffsetCheckpoint class LogManagerTest extends JUnit3Suite { @@ -40,7 +41,15 @@ class LogManagerTest extends JUnit3Suite { override def setUp() { super.setUp() logDir = TestUtils.tempDir() - logManager = new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time) + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) logManager.startup logDir = logManager.logDirs(0) } @@ -116,7 +125,7 @@ class LogManagerTest extends JUnit3Suite { logManager.shutdown() val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L) - logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time) + logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time) logManager.startup // create a log @@ -156,7 +165,7 @@ class LogManagerTest extends JUnit3Suite { def testTimeBasedFlush() { logManager.shutdown() val config = logConfig.copy(flushMs = 1000) - logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time) + logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) logManager.startup val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime @@ -178,7 +187,7 @@ class LogManagerTest extends JUnit3Suite { TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() - logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time) + logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { @@ -194,10 +203,30 @@ class LogManagerTest extends JUnit3Suite { */ def testTwoLogManagersUsingSameDirFails() { try { - new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 1000L, time.scheduler, time) + new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) fail("Should not be able to create a second log manager instance with the same data directory") } catch { case e: KafkaException => // this is good } } + + /** + * Test that recovery points are correctly written out to disk + */ + def testCheckpointRecoveryPoints() { + val topicA = TopicAndPartition("test-a", 1) + val topicB = TopicAndPartition("test-b", 1) + val logA = this.logManager.createLog(topicA, logConfig) + val logB = this.logManager.createLog(topicB, logConfig) + for(i <- 0 until 50) + logA.append(TestUtils.singleMessageSet("test".getBytes())) + for(i <- 0 until 100) + logB.append(TestUtils.singleMessageSet("test".getBytes())) + logA.flush() + logB.flush() + logManager.checkpointRecoveryPointOffsets() + val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() + assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) + assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint) + } } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 3a4b41b..5f2c2e8 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -201,7 +201,7 @@ class LogSegmentTest extends JUnit3Suite { for(i <- 0 until 100) seg.append(i, messages(i, i.toString)) val indexFile = seg.index.file - writeNonsense(indexFile, 5, indexFile.length.toInt) + TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) seg.recover(64*1024) for(i <- 0 until 100) assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset) @@ -221,20 +221,11 @@ class LogSegmentTest extends JUnit3Suite { val offsetToBeginCorruption = rand.nextInt(messagesAppended) // start corrupting somewhere in the middle of the chosen record all the way to the end val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15) - writeNonsense(seg.log.file, position, seg.log.file.length.toInt - position) + TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position) seg.recover(64*1024) assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList) seg.delete() } } - def writeNonsense(fileName: File, position: Long, size: Int) { - val file = new RandomAccessFile(fileName, "rw") - file.seek(position) - val rand = new Random - for(i <- 0 until size) - file.writeByte(rand.nextInt(255)) - file.close() - } - } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 7d41938..b7f43e2 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -67,7 +67,7 @@ class LogTest extends JUnitSuite { // create a log val log = new Log(logDir, logConfig.copy(segmentMs = 1 * 60 * 60L), - needsRecovery = false, + recoveryPoint = 0L, scheduler = time.scheduler, time = time) time.sleep(log.config.segmentMs + 1) @@ -102,7 +102,7 @@ class LogTest extends JUnitSuite { val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -118,7 +118,7 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = new Log(logDir, logConfig, needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time) log.append(TestUtils.singleMessageSet("test".getBytes)) } @@ -127,7 +127,7 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithSequentialOffsets() { - val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray for(i <- 0 until messages.length) @@ -146,7 +146,7 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithNonSequentialOffsets() { - val log = new Log(logDir, logConfig.copy(segmentSize = 71), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) @@ -169,7 +169,7 @@ class LogTest extends JUnitSuite { */ @Test def testReadAtLogGap() { - val log = new Log(logDir, logConfig.copy(segmentSize = 300), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -189,7 +189,7 @@ class LogTest extends JUnitSuite { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, logConfig.copy(segmentSize = 1024), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes) try { log.read(0, 1024) @@ -212,7 +212,7 @@ class LogTest extends JUnitSuite { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) messageSets.foreach(log.append(_)) @@ -228,6 +228,11 @@ class LogTest extends JUnitSuite { } val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) assertEquals("Should be no more messages", 0, lastRead.size) + + // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure + TestUtils.retry(1000L){ + assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) + } } /** @@ -236,7 +241,7 @@ class LogTest extends JUnitSuite { @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val log = new Log(logDir, logConfig.copy(segmentSize = 10), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) @@ -259,7 +264,7 @@ class LogTest extends JUnitSuite { for(messagesToAppend <- List(0, 1, 25)) { logDir.mkdirs() // first test a log segment starting at 0 - val log = new Log(logDir, logConfig.copy(segmentSize = 100), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) for(i <- 0 until messagesToAppend) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) @@ -293,7 +298,7 @@ class LogTest extends JUnitSuite { // append messages to log val maxMessageSize = second.sizeInBytes - 1 - val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), needsRecovery = false, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), recoveryPoint = 0L, time.scheduler, time = time) // should be able to append the small message log.append(first) @@ -316,22 +321,23 @@ class LogTest extends JUnitSuite { val segmentSize = 7 * messageSize val indexInterval = 3 * messageSize val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval, maxIndexSize = 4096) - var log = new Log(logDir, config, needsRecovery = false, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) val lastIndexOffset = log.activeSegment.index.lastOffset val numIndexEntries = log.activeSegment.index.entries + val lastOffset = log.logEndOffset log.close() - log = new Log(logDir, config, needsRecovery = false, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() // test recovery case - log = new Log(logDir, config, needsRecovery = true, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) @@ -346,7 +352,7 @@ class LogTest extends JUnitSuite { // publish the messages and close the log val numMessages = 200 val config = logConfig.copy(segmentSize = 200, indexInterval = 1) - var log = new Log(logDir, config, needsRecovery = true, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) val indexFiles = log.logSegments.map(_.index.file) @@ -356,7 +362,7 @@ class LogTest extends JUnitSuite { indexFiles.foreach(_.delete()) // reopen the log - log = new Log(logDir, config, needsRecovery = true, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) assertEquals(i, log.read(i, 100, None).head.offset) @@ -374,7 +380,7 @@ class LogTest extends JUnitSuite { val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), needsRecovery = false, scheduler = time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -427,7 +433,7 @@ class LogTest extends JUnitSuite { val msgPerSeg = 10 val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages val config = logConfig.copy(segmentSize = segmentSize) - val log = new Log(logDir, config, needsRecovery = false, scheduler = time.scheduler, time = time) + val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) log.append(set) @@ -457,7 +463,7 @@ class LogTest extends JUnitSuite { logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000, indexInterval = 1), - needsRecovery = false, + recoveryPoint = 0L, time.scheduler, time) @@ -484,7 +490,7 @@ class LogTest extends JUnitSuite { // create a log var log = new Log(logDir, config, - needsRecovery = true, + recoveryPoint = 0L, time.scheduler, time) @@ -494,7 +500,7 @@ class LogTest extends JUnitSuite { log.close() log = new Log(logDir, config, - needsRecovery = true, + recoveryPoint = 0L, time.scheduler, time) log.truncateTo(3) @@ -515,7 +521,7 @@ class LogTest extends JUnitSuite { indexInterval = 10000) val log = new Log(logDir, config, - needsRecovery = true, + recoveryPoint = 0L, time.scheduler, time) @@ -550,7 +556,7 @@ class LogTest extends JUnitSuite { val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000) var log = new Log(logDir, config, - needsRecovery = false, + recoveryPoint = 0L, time.scheduler, time) @@ -563,7 +569,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, - needsRecovery = false, + recoveryPoint = 0L, time.scheduler, time) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) @@ -573,7 +579,7 @@ class LogTest extends JUnitSuite { def testAppendMessageWithNullPayload() { var log = new Log(logDir, LogConfig(), - needsRecovery = false, + recoveryPoint = 0L, time.scheduler, time) log.append(new ByteBufferMessageSet(new Message(bytes = null))) @@ -582,4 +588,36 @@ class LogTest extends JUnitSuite { assertTrue("Message payload should be null.", ms.head.message.isNull) } + @Test + def testCorruptLog() { + // append some messages to create some segments + val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) + val set = TestUtils.singleMessageSet("test".getBytes()) + val recoveryPoint = 50L + for(iteration <- 0 until 10) { + // create a log and write some messages to it + var log = new Log(logDir, + config, + recoveryPoint = 0L, + time.scheduler, + time) + for(i <- 0 until 100) + log.append(set) + val seg = log.logSegments(0, recoveryPoint).last + val index = seg.index + val messages = seg.log + val filePosition = messages.searchFor(recoveryPoint, 0).position + val indexPosition = index.lookup(recoveryPoint).position + log.close() + + // corrupt file + TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) + TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) + + // attempt recovery + log = new Log(logDir, config, recoveryPoint, time.scheduler, time) + assertEquals(recoveryPoint, log.logEndOffset) + } + } + } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 456e538..02c188a 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -37,6 +37,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { defaultConfig = LogConfig(), cleanerConfig = CleanerConfig(), flushCheckMs = 30000, + flushCheckpointMs = 10000L, retentionCheckMs = 30000, scheduler = new KafkaScheduler(1), time = new MockTime)) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 298ba71..20fe93e 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -55,11 +55,12 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { // send some messages producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) - // do a clean shutdown and check that the clean shudown file is written out + // do a clean shutdown and check that offset checkpoint file exists server.shutdown() for(logDir <- config.logDirs) { - val cleanShutDownFile = new File(logDir, server.logManager.CleanShutdownFile) - assertTrue(cleanShutDownFile.exists) + val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile) + assertTrue(OffsetCheckpointFile.exists) + assertTrue(OffsetCheckpointFile.length() > 0) } producer.close() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 148bb4b..10712e2 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -515,6 +515,15 @@ object TestUtils extends Logging { servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) } + def writeNonsenseToFile(fileName: File, position: Long, size: Int) { + val file = new RandomAccessFile(fileName, "rw") + file.seek(position) + val rand = new Random + for(i <- 0 until size) + file.writeByte(rand.nextInt(255)) + file.close() + } + } object TestZKUtils { -- 1.7.12.4 (Apple Git-37)