diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index e1f8b97..cffff3f 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -42,7 +42,8 @@ class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] val start: Int, private[log] val end: Int, - isSlice: Boolean) extends MessageSet with Logging { + isSlice: Boolean, + initFileSize: Long = 0) extends MessageSet with Logging { /* the size of the message set in bytes */ private val _size = @@ -54,7 +55,11 @@ class FileMessageSet private[kafka](@volatile var file: File, /* if this is not a slice, update the file pointer to the end of the file */ if (!isSlice) /* set the file position to the last byte in the file */ - channel.position(channel.size) + if (Os.isWindows) { + channel.position(math.min(channel.size().toInt, end)) + } + else + channel.position(channel.size) /** * Create a file message set with no slicing. @@ -69,6 +74,13 @@ class FileMessageSet private[kafka](@volatile var file: File, this(file, Utils.openChannel(file, mutable = true)) /** + * + * This constructor is for Window platform + */ + def this(file: File, initFileSize: Long) = + this(file, Utils.openChannel(file, mutable = true, initFileSize = initFileSize), start = 0, end = 0, isSlice = false) + + /** * Create a file message set with mutable option */ def this(file: File, mutable: Boolean) = this(file, Utils.openChannel(file, mutable)) @@ -246,9 +258,11 @@ class FileMessageSet private[kafka](@volatile var file: File, if(targetSize > originalSize || targetSize < 0) throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " + " size of this log segment is " + originalSize + " bytes.") + trace("FileMessageSet Before truncate channel.size %d. file.getAbsolutePath %s originalSize %d targetSize %d".format(channel.size(), file.getAbsolutePath, originalSize , targetSize)) channel.truncate(targetSize) channel.position(targetSize) _size.set(targetSize) + trace("FileMessageSet After truncate channel.size %d. file.getAbsolutePath %s ".format(channel.size(), file.getAbsolutePath)) originalSize - targetSize } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 46df8d9..cc9c6b3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -59,6 +59,15 @@ class Log(val dir: File, /* last time it was flushed */ private val lastflushedTime = new AtomicLong(time.milliseconds) + private def initFileSize() : Long = { + val segmentFileSize = config.segmentSize - 2 * config.maxMessageSize + if(segmentFileSize <= 0) { + error("Invalid init log segment file size: " + segmentFileSize) + throw new IllegalArgumentException("Invalid init log segment file size: " + segmentFileSize) + } + segmentFileSize + } + /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() @@ -146,11 +155,21 @@ class Log(val dir: File, if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 - segments.put(0, new LogSegment(dir = dir, - startOffset = 0, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - time = time)) + if(Os.isWindows){ + segments.put(0, new LogSegment(dir = dir, + startOffset = 0, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + initFileSize = this.initFileSize(), + time = time)) + } + else { + segments.put(0, new LogSegment(dir = dir, + startOffset = 0, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + time = time)) + } } else { recoverLog() // reset the index size of the currently active log segment to allow more entries @@ -202,7 +221,18 @@ class Log(val dir: File, * Take care! this is an O(n) operation. */ def numberOfSegments: Int = segments.size - + + /** + * For Windows platform only, truncate active segment to position of the pointer. + * Under windows, the file is pre allocated with some size, truncate here can make sure the data integrity. + */ + def truncateDatafile() { + debug("Truncating last file to end of valid content: " + name) + lock synchronized { + activeSegment.truncateDatafile() + } + } + /** * Close this log */ @@ -468,14 +498,33 @@ class Log(val dir: File, } segments.lastEntry() match { - case null => - case entry => entry.getValue.index.trimToValidSize() + case null => + case entry => { + if (Os.isWindows && + (entry.getValue.size > 0 && time.milliseconds - entry.getValue.created > config.segmentMs || entry.getValue.index.isFull)) { + val truncated = entry.getValue().recover(config.maxMessageSize) + debug("truncated " + truncated + " bytes invalid data") + } + entry.getValue.index.trimToValidSize() + } + } + + var segment : LogSegment = null + if(Os.isWindows){ + segment = new LogSegment(dir, + startOffset = newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + initFileSize = initFileSize, + time = time) + } + else{ + segment = new LogSegment(dir, + startOffset = newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + time = time) } - val segment = new LogSegment(dir, - startOffset = newOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - time = time) val prev = addSegment(segment) if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) @@ -563,11 +612,23 @@ class Log(val dir: File, lock synchronized { val segmentsToDelete = logSegments.toList segmentsToDelete.foreach(deleteSegment(_)) - addSegment(new LogSegment(dir, - newOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - time = time)) + + if(Os.isWindows){ + addSegment(new LogSegment(dir, + newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + time = time, + initFileSize = initFileSize)) + } + else{ + addSegment(new LogSegment(dir, + newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + time = time)) + } + this.nextOffset.set(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7cee543..fe00546 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -169,6 +169,10 @@ class LogManager(val logDirs: Array[File], Utils.swallow(cleaner.shutdown()) // flush the logs to ensure latest possible recovery point allLogs.foreach(_.flush()) + //truncate the last log to current write position. + if (Os.isWindows) { + allLogs.foreach(_.truncateDatafile()) + } // close the logs allLogs.foreach(_.close()) // update the last flush point diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..1ee22a7 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -54,6 +54,16 @@ class LogSegment(val log: FileMessageSet, startOffset, indexIntervalBytes, time) + + /** + * For Windows platform, add an init file size parameter when rolling new log segment, which can improve consumer read performance in NTFS file system. + */ + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, initFileSize:Long, time: Time) = + this(new FileMessageSet(file = Log.logFilename(dir, startOffset), initFileSize = initFileSize), + new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), + startOffset, + indexIntervalBytes, + time) /* Return the size in bytes of this log segment */ def size: Long = log.sizeInBytes() @@ -251,7 +261,15 @@ class LogSegment(val log: FileMessageSet, if(!indexRenamed) throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) } - + + /** + * truncate the file to end of data + */ + def truncateDatafile() { + logger.debug("truncate file to end position %d.".format(log.sizeInBytes())) + log.truncateTo(log.sizeInBytes()) + } + /** * Close this log segment */ diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index a89b046..8df1cca 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -156,6 +156,23 @@ object Utils extends Logging { else new FileInputStream(file).getChannel() } + + /** + * For Windows platform, pre-allocating file segment when openning channel + */ + def openChannel(file: File, mutable: Boolean, initFileSize: Long): FileChannel = { + if(mutable) + if (Os.isWindows) { + debug("create RandomAccessFile with size:" + initFileSize) + val randomAccessFile = new RandomAccessFile(file, "rw") + randomAccessFile.setLength(initFileSize) + randomAccessFile.getChannel() + } + else + new RandomAccessFile(file, "rw").getChannel() + else + new FileInputStream(file).getChannel() + } /** * Do the given action and log any exceptions thrown without rethrowing them