diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 2522604..a60f99c 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -54,7 +54,7 @@ class FileMessageSet private[kafka](@volatile var file: File, /* if this is not a slice, update the file pointer to the end of the file */ if (!isSlice) /* set the file position to the last byte in the file */ - channel.position(channel.size) + channel.position(math.min(channel.size().toInt, end)) /** * Create a file message set with no slicing. @@ -66,12 +66,25 @@ class FileMessageSet private[kafka](@volatile var file: File, * Create a file message set with no slicing */ def this(file: File) = - this(file, CoreUtils.openChannel(file, mutable = true)) + this(file, FileMessageSet.openChannel(file, mutable = true)) + + /** + * Create a file message set with no slicing, and with initFileSize and preallocate. + * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize + * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance. + * If it's new file and preallocate is true, end will be set to 0. Otherwise set to Int.MaxValue. + */ + def this(file: File, fileAlreadyExists: Boolean, initFileSize: Long, preallocate: Boolean) = + this(file, + channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate), + start = 0, + end = ( if( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue), + isSlice = false) /** * Create a file message set with mutable option */ - def this(file: File, mutable: Boolean) = this(file, CoreUtils.openChannel(file, mutable)) + def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable)) /** * Create a slice view of the file message set that begins and ends at the given byte offsets @@ -223,10 +236,18 @@ class FileMessageSet private[kafka](@volatile var file: File, */ def close() { flush() + trim() channel.close() } /** + * Trim file when close or roll to next file + */ + def trim() { + truncateTo(sizeInBytes()) + } + + /** * Delete this message set from the filesystem * @return True iff this message set was deleted. */ @@ -272,6 +293,36 @@ class FileMessageSet private[kafka](@volatile var file: File, } } + +object FileMessageSet +{ + /** + * Open a channel for the given file + * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize + * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance. + * @param file File path + * @param mutable mutable + * @param fileAlreadyExists File already exists or not + * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024 + * @param preallocate Pre allocate file or not, gotten from configuration. + */ + def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Long = 0, preallocate: Boolean = false): FileChannel = { + if(mutable) + if(fileAlreadyExists) + new RandomAccessFile(file, "rw").getChannel() + else { + if (preallocate) { + val randomAccessFile = new RandomAccessFile(file, "rw") + randomAccessFile.setLength(initFileSize) + randomAccessFile.getChannel() + } + else + new RandomAccessFile(file, "rw").getChannel() + } + else + new FileInputStream(file).getChannel() + } +} object LogFlushStats extends KafkaMetricsGroup { val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 84e7b8f..ed7ae6d 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -78,6 +78,18 @@ class Log(val dir: File, /* last time it was flushed */ private val lastflushedTime = new AtomicLong(time.milliseconds) + private def initFileSize() : Long = { + if(config.preallocate) { + if (config.segmentSize <= 0) { + error("Invalid init log segment file size: " + config.segmentSize ) + throw new IllegalArgumentException("Invalid init log segment file size: " + config.segmentSize ) + } + config.segmentSize + } + else + 0 + } + /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() @@ -168,7 +180,8 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, - time = time) + time = time, + fileAlreadyExists = true) if(!hasIndex) { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) @@ -205,7 +218,10 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, - time = time)) + time = time, + fileAlreadyExists = false, + initFileSize = this.initFileSize(), + preallocate = config.preallocate)) } else { recoverLog() // reset the index size of the currently active log segment to allow more entries @@ -586,14 +602,20 @@ class Log(val dir: File, segments.lastEntry() match { case null => - case entry => entry.getValue.index.trimToValidSize() + case entry => { + entry.getValue.index.trimToValidSize() + entry.getValue.log.trim() + } } val segment = new LogSegment(dir, startOffset = newOffset, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, - time = time) + time = time, + fileAlreadyExists = false, + initFileSize = initFileSize, + preallocate = config.preallocate) val prev = addSegment(segment) if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) @@ -687,7 +709,10 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, - time = time)) + time = time, + fileAlreadyExists = false, + initFileSize = initFileSize, + preallocate = config.preallocate)) updateLogEndOffset(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) } diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index a907da0..4b48517 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -41,6 +41,7 @@ object Defaults { val UncleanLeaderElectionEnable = true val MinInSyncReplicas = 1 val CompressionType = "producer" + val PreAllocateEnable = false } /** @@ -61,6 +62,7 @@ object Defaults { * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks * @param compressionType compressionType for a given topic + * @param preallocate Should pre allocate file when create new segment * */ case class LogConfig(segmentSize: Int = Defaults.SegmentSize, @@ -79,7 +81,8 @@ case class LogConfig(segmentSize: Int = Defaults.SegmentSize, compact: Boolean = Defaults.Compact, uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, minInSyncReplicas: Int = Defaults.MinInSyncReplicas, - compressionType: String = Defaults.CompressionType) { + compressionType: String = Defaults.CompressionType, + preallocate: Boolean = Defaults.PreAllocateEnable) { def toProps: Properties = { val props = new Properties() @@ -101,6 +104,7 @@ case class LogConfig(segmentSize: Int = Defaults.SegmentSize, props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) props.put(CompressionTypeProp, compressionType) + props.put(PreAllocateEnableProp, preallocate.toString) props } @@ -130,6 +134,7 @@ object LogConfig { val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" val MinInSyncReplicasProp = "min.insync.replicas" val CompressionTypeProp = "compression.type" + val PreAllocateEnableProp = "preallocate" val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log" val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled" @@ -153,6 +158,7 @@ object LogConfig { val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " + "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " + "no compression; and 'producer' which means retain the original compression codec set by the producer." + val PreAllocateEnableDoc ="Should pre allocate file when create new segment?" private val configDef = { import ConfigDef.Range._ @@ -184,6 +190,8 @@ object LogConfig { MEDIUM, UncleanLeaderElectionEnableDoc) .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc) .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc) + .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, + MEDIUM, PreAllocateEnableDoc) } def configNames() = { @@ -214,7 +222,8 @@ object LogConfig { compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete, uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int], - compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase()) + compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase(), + preallocate = parsed.get(PreAllocateEnableProp).asInstanceOf[Boolean]) } /** diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index ed03953..63f896e 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -52,8 +52,8 @@ class LogSegment(val log: FileMessageSet, /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 - def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) = - this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Long = 0, preallocate: Boolean = false) = + this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate), new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9efa15c..6b02a4f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -80,6 +80,7 @@ object Defaults { val LogDeleteDelayMs = 60000 val LogFlushSchedulerIntervalMs = Long.MaxValue val LogFlushOffsetCheckpointIntervalMs = 60000 + val LogPreAllocateEnable = false val NumRecoveryThreadsPerDataDir = 1 val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 @@ -191,6 +192,7 @@ object KafkaConfig { val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms" val LogFlushIntervalMsProp = "log.flush.interval.ms" val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms" + val LogPreAllocateProp = "log.preallocate" val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas" @@ -310,6 +312,7 @@ object KafkaConfig { val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk" val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk" val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point" + val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need set it to true." val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server" val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)" @@ -437,6 +440,7 @@ object KafkaConfig { .define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc) .define(LogFlushIntervalMsProp, LONG, HIGH, LogFlushIntervalMsDoc, false) .define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc) + .define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc) .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc) .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc) .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc) @@ -557,6 +561,7 @@ object KafkaConfig { logFlushSchedulerIntervalMs = parsed.get(LogFlushSchedulerIntervalMsProp).asInstanceOf[Long], _logFlushIntervalMs = Option(parsed.get(LogFlushIntervalMsProp)).map(_.asInstanceOf[Long]), logFlushOffsetCheckpointIntervalMs = parsed.get(LogFlushOffsetCheckpointIntervalMsProp).asInstanceOf[Int], + logPreAllocateEnable = parsed.get(LogPreAllocateProp).asInstanceOf[Boolean], numRecoveryThreadsPerDataDir = parsed.get(NumRecoveryThreadsPerDataDirProp).asInstanceOf[Int], autoCreateTopicsEnable = parsed.get(AutoCreateTopicsEnableProp).asInstanceOf[Boolean], minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int], @@ -698,6 +703,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val logFlushSchedulerIntervalMs: Long = Defaults.LogFlushSchedulerIntervalMs, private val _logFlushIntervalMs: Option[Long] = None, val logFlushOffsetCheckpointIntervalMs: Int = Defaults.LogFlushOffsetCheckpointIntervalMs, + val logPreAllocateEnable: Boolean = Defaults.LogPreAllocateEnable, val numRecoveryThreadsPerDataDir: Int = Defaults.NumRecoveryThreadsPerDataDir, val autoCreateTopicsEnable: Boolean = Defaults.AutoCreateTopicsEnable, @@ -920,6 +926,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(LogFlushSchedulerIntervalMsProp, logFlushSchedulerIntervalMs.toString) _logFlushIntervalMs.foreach(v => props.put(LogFlushIntervalMsProp, v.toString)) props.put(LogFlushOffsetCheckpointIntervalMsProp, logFlushOffsetCheckpointIntervalMs.toString) + props.put(LogPreAllocateProp, logPreAllocateEnable.toString) props.put(NumRecoveryThreadsPerDataDirProp, numRecoveryThreadsPerDataDir.toString) props.put(AutoCreateTopicsEnableProp, autoCreateTopicsEnable.toString) props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b7d2a28..d7b1150 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -382,7 +382,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact", minInSyncReplicas = config.minInSyncReplicas, - compressionType = config.compressionType) + compressionType = config.compressionType, + preallocate = config.logPreAllocateEnable) val defaultProps = defaultLogConfig.toProps val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index d0a8fa7..1255c1e 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -70,16 +70,6 @@ object CoreUtils extends Logging { Utils.daemonThread(name, runnable(fun)) /** - * Open a channel for the given file - */ - def openChannel(file: File, mutable: Boolean): FileChannel = { - if(mutable) - new RandomAccessFile(file, "rw").getChannel() - else - new FileInputStream(file).getChannel() - } - - /** * Do the given action and log any exceptions thrown without rethrowing them * @param log The log method to use for logging. E.g. logger.warn * @param action The action to execute diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index cec1cae..6bbf32c 100644 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -17,6 +17,7 @@ package kafka.log +import java.io._ import java.nio._ import java.util.concurrent.atomic._ import junit.framework.Assert._ @@ -146,5 +147,57 @@ class FileMessageSetTest extends BaseMessageSetTestCases { assertEquals(List(message), messageSet.toList) assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes) } - + + /** + * Test the new FileMessageSet with pre allocate as true + */ + @Test + def testPreallocateTrue() { + val temp = tempFile() + val set = new FileMessageSet(temp, false, 512 *1024 *1024, true) + val position = set.channel.position + val size = set.sizeInBytes() + assertEquals(0, position) + assertEquals(0, size) + assertEquals(512 *1024 *1024, temp.length) + } + + /** + * Test the new FileMessageSEt with pre allocate as false + */ + @Test + def testPreallocateFalse() { + val temp = tempFile() + val set = new FileMessageSet(temp, false, 512 *1024 *1024, false) + val position = set.channel.position + val size = set.sizeInBytes() + assertEquals(0, position) + assertEquals(0, size) + assertEquals(0, temp.length) + } + + /** + * Test the new FileMessageSet with pre allocate as true and file has been clear shutdown, the file will be truncate to end of valid data. + */ + @Test + def testPreallocateClearShutdown() { + val temp = tempFile() + val set = new FileMessageSet(temp, false, 512 *1024 *1024, true) + set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*)) + val oldposition = set.channel.position + val oldsize = set.sizeInBytes() + assertEquals(messageSet.sizeInBytes, oldposition) + assertEquals(messageSet.sizeInBytes, oldsize) + set.close() + + val tempReopen = new File(temp.getAbsolutePath()) + val setReopen = new FileMessageSet(tempReopen, true, 512 *1024 *1024, true) + val position = setReopen.channel.position + val size = setReopen.sizeInBytes() + + assertEquals(oldposition, position) + assertEquals(oldposition, size) + assertEquals(oldposition, tempReopen.length) + } + } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 03fb351..38b5f0d 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -226,5 +226,57 @@ class LogSegmentTest extends JUnit3Suite { seg.delete() } } - + + /* create a segment with pre allocate */ + def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Long = 0, preallocate: Boolean = false): LogSegment = { + val tempDir = TestUtils.tempDir() + val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate) + segments += seg + seg + } + + /* create a segment with pre allocate, put message to it and verify */ + @Test + def testCreateWithInitFileSizeAppendMessage() { + val seg = createSegment(40, false, 512*1024*1024, true) + val ms = messages(50, "hello", "there") + seg.append(50, ms) + val ms2 = messages(60, "alpha", "beta") + seg.append(60, ms2) + val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) + assertEquals(ms2.toList, read.messageSet.toList) + } + + /* create a segment with pre allocate and clearShutdown*/ + @Test + def testCreateWithInitFileSizeClearShutdown() { + val tempDir = TestUtils.tempDir() + val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true) + + val ms = messages(50, "hello", "there") + seg.append(50, ms) + val ms2 = messages(60, "alpha", "beta") + seg.append(60, ms2) + val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) + assertEquals(ms2.toList, read.messageSet.toList) + val oldSize = seg.log.sizeInBytes() + val oldPosition = seg.log.channel.position + val oldFileSize = seg.log.file.length + assertEquals(512*1024*1024, oldFileSize) + seg.close() + //After close, file should be trimed + assertEquals(oldSize, seg.log.file.length) + + val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true, 512*1024*1024, true) + segments += segReopen + + val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None) + assertEquals(ms2.toList, readAgain.messageSet.toList) + val size = segReopen.log.sizeInBytes() + val position = segReopen.log.channel.position + val fileSize = segReopen.log.file.length + assertEquals(oldPosition, position) + assertEquals(oldSize, size) + assertEquals(size, fileSize) + } } \ No newline at end of file