From 2d5f9ee2ca5f48f40dbcba96161df799a4400948 Mon Sep 17 00:00:00 2001 From: Ravi Peri Date: Fri, 27 May 2016 15:35:58 -0700 Subject: [PATCH] Kafka 1194 work item to handle OS/environments where updates to memory mapped files are not supported. --- core/src/main/scala/kafka/log/Log.scala | 19 +++++---- core/src/main/scala/kafka/log/LogCleaner.scala | 4 +- core/src/main/scala/kafka/log/LogConfig.scala | 11 +++-- core/src/main/scala/kafka/log/LogSegment.scala | 22 ++++++---- core/src/main/scala/kafka/log/OffsetIndex.scala | 48 +++++++++++++++------- core/src/main/scala/kafka/server/KafkaConfig.scala | 19 ++++++++- core/src/main/scala/kafka/server/KafkaServer.scala | 6 +-- .../main/scala/kafka/tools/DumpLogSegments.scala | 6 ++- .../test/scala/unit/kafka/log/CleanerTest.scala | 3 ++ .../test/scala/unit/kafka/log/LogConfigTest.scala | 22 +++++++--- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 17 +++++--- core/src/test/scala/unit/kafka/log/LogTest.scala | 5 ++- .../scala/unit/kafka/log/OffsetIndexTest.scala | 10 +++-- .../config/server.properties | 1 + tests/kafkatest/services/kafka/config_property.py | 1 + 15 files changed, 137 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7175b64..3147d5b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -170,7 +170,8 @@ class Log(val dir: File, // if its a log file, load the corresponding log segment val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong val indexFile = Log.indexFilename(dir, start) - val segment = new LogSegment(dir = dir, + val segment = new LogSegment(config, + dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, @@ -205,12 +206,13 @@ class Log(val dir: File, val fileName = logFile.getName val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) - val index = new OffsetIndex(file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) - val swapSegment = new LogSegment(new FileMessageSet(file = swapFile), + val index = new OffsetIndex(logConfig = config, file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) + val swapSegment = new LogSegment(config, + new FileMessageSet(file = swapFile), index = index, baseOffset = startOffset, indexIntervalBytes = config.indexInterval, - rollJitterMs = config.randomSegmentJitter, + rollJitterMs = config.randomSegmentJitter, time = time) info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath)) swapSegment.recover(config.maxMessageSize) @@ -220,7 +222,8 @@ class Log(val dir: File, if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 - segments.put(0L, new LogSegment(dir = dir, + segments.put(0L, new LogSegment(logConfig = config, + dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, @@ -629,7 +632,8 @@ class Log(val dir: File, entry.getValue.log.trim() } } - val segment = new LogSegment(dir, + val segment = new LogSegment(config, + dir, startOffset = newOffset, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, @@ -728,7 +732,8 @@ class Log(val dir: File, lock synchronized { val segmentsToDelete = logSegments.toList segmentsToDelete.foreach(deleteSegment(_)) - addSegment(new LogSegment(dir, + addSegment(new LogSegment(config, + dir, newOffset, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d5c247c..62614ba 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -361,8 +361,8 @@ private[log] class Cleaner(val id: Int, val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix) indexFile.delete() val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate) - val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) - val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) + val index = new OffsetIndex(log.config, indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) + val cleaned = new LogSegment(log.config, messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) try { // clean segments into the new destination segment diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 7fc7e33..2d29f11 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -44,6 +44,7 @@ object Defaults { val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas val CompressionType = kafka.server.Defaults.CompressionType val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable + val MmappedFileUpdateEnable = kafka.server.Defaults.MmapFileUpdateEnable } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { @@ -68,8 +69,9 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp) val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp) val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase - val preallocate = getBoolean(LogConfig.PreAllocateEnableProp) - + val preallocate = getBoolean(LogConfig.PreAllocateEnableProp) + val MmappedFileUpdateEnable = getBoolean(LogConfig.MmapFileUpdateEnableProp) + def randomSegmentJitter: Long = if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) } @@ -101,6 +103,7 @@ object LogConfig { val MinInSyncReplicasProp = "min.insync.replicas" val CompressionTypeProp = "compression.type" val PreAllocateEnableProp = "preallocate" + val MmapFileUpdateEnableProp = "memorymapped.file.update.enable" val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log" val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled" @@ -125,6 +128,7 @@ object LogConfig { "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " + "no compression; and 'producer' which means retain the original compression codec set by the producer." val PreAllocateEnableDoc ="Should pre allocate file when create new segment?" + val MmapFileUpdateEnableDoc = "Indicates if the underlying OS supports metadata updates (resize, update length) to memory mapped files" private val configDef = { import ConfigDef.Range._ @@ -157,7 +161,8 @@ object LogConfig { .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc) .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc) .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, - MEDIUM, PreAllocateEnableDoc) + MEDIUM, PreAllocateEnableDoc) + .define(MmapFileUpdateEnableProp, BOOLEAN, Defaults.MmappedFileUpdateEnable, HIGH, MmapFileUpdateEnableDoc) } def apply(): LogConfig = LogConfig(new Properties()) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 3ea472f..11739dd 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -33,18 +33,20 @@ import java.io.File * * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. * + * @param logConfig log configuration properties * @param log The message set containing log entries * @param index The offset index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index - * @param time The time instance + * @param time The time instance * */ @nonthreadsafe -class LogSegment(val log: FileMessageSet, +class LogSegment(val logConfig: LogConfig, + val log: FileMessageSet, val index: OffsetIndex, val baseOffset: Long, val indexIntervalBytes: Int, - val rollJitterMs: Long, + val rollJitterMs: Long, time: Time) extends Logging { var created = time.milliseconds @@ -52,9 +54,10 @@ class LogSegment(val log: FileMessageSet, /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 - def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = - this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate), - new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), + def this(logConfig: LogConfig, dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = + this(logConfig, + new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate), + new OffsetIndex(logConfig = logConfig, file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, rollJitterMs, @@ -255,12 +258,15 @@ class LogSegment(val log: FileMessageSet, * Change the suffix for the index and log file for this log segment */ def changeFileSuffixes(oldSuffix: String, newSuffix: String) { + /* close the log file and index before renaming */ + close(); + val logRenamed = log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) if(!logRenamed) throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) val indexRenamed = index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) if(!indexRenamed) - throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) + throw new KafkaStorageException("Failed to change the index file %s suffix from %s to %s for log segment %d".format(index.file.getPath, oldSuffix, newSuffix, baseOffset)) } /** @@ -287,7 +293,7 @@ class LogSegment(val log: FileMessageSet, def deleteIndex() { val deletedIndex = index.forceDelete() if(!deletedIndex && index.file.exists) - throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") + throw new KafkaStorageException("Delete of index " + index.file.getAbsolutePath + " failed.") } /** diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 319bdac..dbaf0ff 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -52,10 +52,11 @@ import kafka.common.InvalidOffsetException * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal * storage format. */ -class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { +class OffsetIndex(val logConfig: LogConfig, @volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { private val lock = new ReentrantLock - + private val mMappedFileUpdatesEnabled = logConfig.MmappedFileUpdateEnable + /* initialize the memory mapping for this index */ private var mmap: MappedByteBuffer = { @@ -279,8 +280,10 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi val position = this.mmap.position /* Windows won't let us modify the file length while the file is mmapped :-( */ - if(Os.isWindows) + if(!mMappedFileUpdatesEnabled){ forceUnmap(this.mmap) + } + try { raf.setLength(roundedNewSize) this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) @@ -296,9 +299,13 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Forcefully free the buffer's mmap. We do this only on windows. */ private def forceUnmap(m: MappedByteBuffer) { - try { - if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) - (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + try { + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) { + var cl = (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner(); + if(cl != null){ + cl.clean() + } + } } catch { case t: Throwable => warn("Error when freeing index buffer", t) } @@ -318,15 +325,25 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ def delete(): Boolean = { info("Deleting index " + this.file.getAbsolutePath) - if(Os.isWindows) + if(!mMappedFileUpdatesEnabled) CoreUtils.swallow(forceUnmap(this.mmap)) - this.file.delete() + + if(this.file.exists){ + return this.file.delete() + } + + return true } def forceDelete(): Boolean = { - info("Deleting index " + this.file.getAbsolutePath) + info("force deleting index " + this.file.getAbsolutePath) + CoreUtils.swallow(forceUnmap(this.mmap)) - this.file.delete() + if(this.file.exists){ + var deleted = this.file.delete() + } + + return true } /** The number of entries in this index */ @@ -338,15 +355,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi def sizeInBytes() = 8 * entries /** Close the index */ - def close() { + def close() { trimToValidSize() + + /* forcefully free the MappedByteBuffer */ + forceUnmap(this.mmap) } /** * Rename the file that backs this offset index * @return true iff the rename was successful */ - def renameTo(f: File): Boolean = { + def renameTo(f: File): Boolean = { val success = this.file.renameTo(f) this.file = f success @@ -378,12 +398,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * and this requires synchronizing reads. */ private def maybeLock[T](lock: Lock)(fun: => T): T = { - if(Os.isWindows) + if(!mMappedFileUpdatesEnabled) lock.lock() try { fun } finally { - if(Os.isWindows) + if(!mMappedFileUpdatesEnabled) lock.unlock() } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 32a5a11..4c2b273 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -25,6 +25,7 @@ import kafka.consumer.ConsumerConfig import kafka.coordinator.OffsetConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils +import kafka.utils.Os import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs @@ -98,6 +99,7 @@ object Defaults { val NumRecoveryThreadsPerDataDir = 1 val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 + val MmapFileUpdateEnable = !Os.isWindows /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMs = RequestTimeoutMs @@ -255,6 +257,8 @@ object KafkaConfig { val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas" + val MmapFileUpdateEnableProp = "log.memorymapped.file.update.enable" + /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" val DefaultReplicationFactorProp = "default.replication.factor" @@ -418,6 +422,8 @@ object KafkaConfig { val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server" val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)" + val MmappedFileUpdatesEnableDoc = "Indicates if metadata updates are allowed on a memory mapped file. (False for windows, windows hosted fileshares)." + /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" @@ -592,6 +598,7 @@ object KafkaConfig { .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc) .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc) .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc) + .define(MmapFileUpdateEnableProp, BOOLEAN, Defaults.MmapFileUpdateEnable, HIGH, MmappedFileUpdatesEnableDoc) /** ********* Replication configuration ***********/ .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc) @@ -784,7 +791,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val logRetentionTimeMillis = getLogRetentionTimeMillis val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp) val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp) - + val mmapFileUpdateEnable: java.lang.Boolean = getMmappedFileUpdateEnabled + /** ********* Replication configuration ***********/ val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp) @@ -867,6 +875,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val listeners = getListeners val advertisedListeners = getAdvertisedListeners + private def getMmappedFileUpdateEnabled: Boolean = { + try { + getBoolean(KafkaConfig.MmapFileUpdateEnableProp) + } + catch { + case e: Exception => !Os.isWindows + } + } + private def getLogRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c0ae991..dc7feb9 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -74,13 +74,13 @@ object KafkaServer { logProps.put(LogConfig.MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas) logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType) logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable) - logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable) + logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable) + logProps.put(LogConfig.MmapFileUpdateEnableProp, kafkaConfig.mmapFileUpdateEnable) + logProps } } - - /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index fd15014..9269eda 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.KafkaException import org.apache.kafka.common.utils.Utils import scala.collection.mutable +import java.util.Properties object DumpLogSegments { @@ -118,8 +119,9 @@ object DumpLogSegments { maxMessageSize: Int) { val startOffset = file.getName().split("\\.")(0).toLong val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix) - val messageSet = new FileMessageSet(logFile, false) - val index = new OffsetIndex(file = file, baseOffset = startOffset) + val messageSet = new FileMessageSet(logFile, false) + val config = LogConfig(new Properties()) + val index = new OffsetIndex(config, file = file, baseOffset = startOffset) for(i <- 0 until index.entries) { val entry = index.entry(i) val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, maxMessageSize) diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 8ab9f91..80079d1 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -76,7 +76,10 @@ class CleanerTest extends JUnitSuite { // clean the log cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L) + println("Cleaned segment") + val shouldRemain = keysInLog(log).filter(!keys.contains(_)) + println("got keys") assertEquals(shouldRemain, keysInLog(log)) } diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 51cd62c..5a6d8dd 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigException import org.junit.{Assert, Test} import org.junit.Assert._ import org.scalatest.Assertions._ +import kafka.utils.Os class LogConfigTest { @@ -35,13 +36,24 @@ class LogConfigTest { val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") kafkaProps.put(KafkaConfig.LogRollTimeHoursProp, "2") kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2") - kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2") - - val kafkaConfig = KafkaConfig.fromProps(kafkaProps) - val logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig) + kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2") + + var kafkaConfig = KafkaConfig.fromProps(kafkaProps) + var logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig) assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentMsProp)) assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentJitterMsProp)) assertEquals(2 * millisInHour, logProps.get(LogConfig.RetentionMsProp)) + assertEquals(!Os.isWindows, logProps.get(LogConfig.MmapFileUpdateEnableProp).asInstanceOf[Boolean]) + + kafkaProps.put(KafkaConfig.MmapFileUpdateEnableProp, "true") + kafkaConfig = KafkaConfig.fromProps(kafkaProps) + logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig) + assertTrue(logProps.get(LogConfig.MmapFileUpdateEnableProp).asInstanceOf[Boolean]) + + kafkaProps.put(KafkaConfig.MmapFileUpdateEnableProp, "false") + kafkaConfig = KafkaConfig.fromProps(kafkaProps) + logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig) + assertFalse(logProps.get(LogConfig.MmapFileUpdateEnableProp).asInstanceOf[Boolean]) } @Test @@ -65,7 +77,7 @@ class LogConfigTest { } }) } - + private def assertPropertyInvalid(name: String, values: AnyRef*) { values.foreach((value) => { val props = new Properties diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index fa982b1..f1dba4e 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -18,8 +18,10 @@ import org.junit.Assert._ import java.util.concurrent.atomic._ +import java.util.Properties import org.junit.{Test, After} import kafka.utils.TestUtils +import kafka.utils.Os import kafka.message._ import kafka.utils.SystemTime import scala.collection._ @@ -27,15 +29,17 @@ import scala.collection._ class LogSegmentTest { val segments = mutable.ArrayBuffer[LogSegment]() - + val config = LogConfig(new Properties()) + /* create a segment with the given base offset */ def createSegment(offset: Long): LogSegment = { val msFile = TestUtils.tempFile() val ms = new FileMessageSet(msFile) val idxFile = TestUtils.tempFile() idxFile.delete() - val idx = new OffsetIndex(idxFile, offset, 1000) - val seg = new LogSegment(ms, idx, offset, 10, 0, SystemTime) + val idx = new OffsetIndex(config, idxFile, offset, 1000) + + val seg = new LogSegment(config, ms, idx, offset, 10, 0, SystemTime)//, mmappedFileUpdatesEnabled) segments += seg seg } @@ -180,6 +184,7 @@ class LogSegmentTest { val seg = createSegment(40) val logFile = seg.log.file val indexFile = seg.index.file + seg.changeFileSuffixes("", ".deleted") assertEquals(logFile.getAbsolutePath + ".deleted", seg.log.file.getAbsolutePath) assertEquals(indexFile.getAbsolutePath + ".deleted", seg.index.file.getAbsolutePath) @@ -226,7 +231,7 @@ class LogSegmentTest { /* create a segment with pre allocate */ def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): LogSegment = { val tempDir = TestUtils.tempDir() - val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate) + val seg = new LogSegment(config, tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)//, mmappedFileUpdatesEnabled) segments += seg seg } @@ -247,7 +252,7 @@ class LogSegmentTest { @Test def testCreateWithInitFileSizeClearShutdown() { val tempDir = TestUtils.tempDir() - val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true) + val seg = new LogSegment(config, tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true)//, mmappedFileUpdatesEnabled) val ms = messages(50, "hello", "there") seg.append(50, ms) @@ -263,7 +268,7 @@ class LogSegmentTest { //After close, file should be trimed assertEquals(oldSize, seg.log.file.length) - val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true, 512*1024*1024, true) + val segReopen = new LogSegment(config, tempDir, 40, 10, 1000, 0, SystemTime, true, 512*1024*1024, true)//, mmappedFileUpdatesEnabled) segments += segReopen val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index f4427b9..20f2ade 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -27,6 +27,7 @@ import kafka.message._ import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException} import kafka.utils._ import kafka.server.KafkaConfig +import org.scalatest.junit.JUnitRunner class LogTest extends JUnitSuite { @@ -765,7 +766,7 @@ class LogTest extends JUnitSuite { val config = LogConfig(logProps) val set = TestUtils.singleMessageSet("test".getBytes()) val recoveryPoint = 50L - for(iteration <- 0 until 50) { + for(iteration <- 0 until 50) { // create a log and write some messages to it logDir.mkdirs() var log = new Log(logDir, @@ -787,7 +788,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, recoveryPoint, time.scheduler, time) assertEquals(numMessages, log.logEndOffset) assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) - CoreUtils.rm(logDir) + log.delete() } } diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index dfd7b54..e0aeccb 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -19,22 +19,24 @@ package kafka.log import java.io._ import org.junit.Assert._ -import java.util.{Collections, Arrays} +import java.util.{Collections, Arrays, Properties} import org.junit._ import org.scalatest.junit.JUnitSuite import scala.collection._ import scala.util.Random import kafka.utils.TestUtils import kafka.common.InvalidOffsetException +import kafka.utils.Os class OffsetIndexTest extends JUnitSuite { var idx: OffsetIndex = null val maxEntries = 30 + val config = LogConfig(new Properties()) @Before def setup() { - this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8) + this.idx = new OffsetIndex(logConfig = config, file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8) } @After @@ -103,7 +105,7 @@ class OffsetIndexTest extends JUnitSuite { idx.append(first.offset, first.position) idx.append(sec.offset, sec.position) idx.close() - val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset) + val idxRo = new OffsetIndex(logConfig = config, file = idx.file, baseOffset = idx.baseOffset) assertEquals(first, idxRo.lookup(first.offset)) assertEquals(sec, idxRo.lookup(sec.offset)) assertEquals(sec.offset, idxRo.lastOffset) @@ -113,7 +115,7 @@ class OffsetIndexTest extends JUnitSuite { @Test def truncate() { - val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) + val idx = new OffsetIndex(logConfig = config, file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) idx.truncate() for(i <- 1 until 10) idx.append(i, i) diff --git a/system_test/offset_management_testsuite/config/server.properties b/system_test/offset_management_testsuite/config/server.properties index b6de528..ad9633b 100644 --- a/system_test/offset_management_testsuite/config/server.properties +++ b/system_test/offset_management_testsuite/config/server.properties @@ -137,6 +137,7 @@ replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.fetch.min.bytes=4096 num.replica.fetchers=1 +log.memorymapped.file.update.enable=false offsets.topic.num.partitions=2 offsets.topic.replication.factor=4 diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index cc685aa..65ddc1e 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -101,6 +101,7 @@ From KafkaConfig.scala val LogPreAllocateProp = "log.preallocate" val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val MinInSyncReplicasProp = "min.insync.replicas" + val MmapFileUpdateEnableProp = "log.memorymapped.file.update.enable" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" val DefaultReplicationFactorProp = "default.replication.factor" -- 2.8.2.windows.1