diff --git a/.reviewboardrc b/.reviewboardrc deleted file mode 100644 index 5e8d670..0000000 --- a/.reviewboardrc +++ /dev/null @@ -1,3 +0,0 @@ -REPOSITORY = 'git://git.apache.org/kafka.git' -TARGET_GROUPS = 'kafka' -GUESS_FIELDS = True diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index e1f8b97..a678440 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -42,7 +42,8 @@ class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] val start: Int, private[log] val end: Int, - isSlice: Boolean) extends MessageSet with Logging { + isSlice: Boolean, + initFileSize: Long = 0) extends MessageSet with Logging { /* the size of the message set in bytes */ private val _size = @@ -54,7 +55,11 @@ class FileMessageSet private[kafka](@volatile var file: File, /* if this is not a slice, update the file pointer to the end of the file */ if (!isSlice) /* set the file position to the last byte in the file */ - channel.position(channel.size) + if (Os.isWindows) { + channel.position(math.min(channel.size().toInt, end)) + } + else + channel.position(channel.size) /** * Create a file message set with no slicing. @@ -69,6 +74,13 @@ class FileMessageSet private[kafka](@volatile var file: File, this(file, Utils.openChannel(file, mutable = true)) /** + * + * This constructor is for Window platform + */ + def this(file: File, initFileSize: Long) = + this(file, Utils.openChannel(file, mutable = true, initFileSize = initFileSize), start = 0, end = 0, isSlice = false) + + /** * Create a file message set with mutable option */ def this(file: File, mutable: Boolean) = this(file, Utils.openChannel(file, mutable)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 46df8d9..f990032 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -59,6 +59,15 @@ class Log(val dir: File, /* last time it was flushed */ private val lastflushedTime = new AtomicLong(time.milliseconds) + private def initFileSize() : Long = { + val segmentFileSize = config.segmentSize - 2 * config.maxMessageSize + if(segmentFileSize <= 0) { + error("Invalid init log segment file size: " + segmentFileSize) + throw new IllegalArgumentException("Invalid init log segment file size: " + segmentFileSize) + } + segmentFileSize + } + /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() @@ -146,13 +155,27 @@ class Log(val dir: File, if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 - segments.put(0, new LogSegment(dir = dir, - startOffset = 0, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - time = time)) + if(Os.isWindows){ + segments.put(0, new LogSegment(dir = dir, + startOffset = 0, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + initFileSize = this.initFileSize(), + time = time)) + } + else { + segments.put(0, new LogSegment(dir = dir, + startOffset = 0, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + time = time)) + } } else { recoverLog() + // If broker is gracefully stopped, the active segment should be truncated off on broker restart + if (Os.isWindows) { + activeSegment.recover(config.maxMessageSize) + } // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) } @@ -434,7 +457,7 @@ class Log(val dir: File, */ private def maybeRoll(): LogSegment = { val segment = activeSegment - if (segment.size > config.segmentSize || + if (segment.size > config.segmentSize || segment.size > 0 && time.milliseconds - segment.created > config.segmentMs || segment.index.isFull) { debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." @@ -468,14 +491,33 @@ class Log(val dir: File, } segments.lastEntry() match { - case null => - case entry => entry.getValue.index.trimToValidSize() + case null => + case entry => { + if (Os.isWindows && + (entry.getValue.size > 0 && time.milliseconds - entry.getValue.created > config.segmentMs || entry.getValue.index.isFull)) { + val truncated = entry.getValue().recover(config.maxMessageSize) + debug("truncated " + truncated + " bytes invalid data") + } + entry.getValue.index.trimToValidSize() + } + } + + var segment : LogSegment = null + if(Os.isWindows){ + segment = new LogSegment(dir, + startOffset = newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + initFileSize = initFileSize, + time = time) + } + else{ + segment = new LogSegment(dir, + startOffset = newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + time = time) } - val segment = new LogSegment(dir, - startOffset = newOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - time = time) val prev = addSegment(segment) if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) @@ -563,11 +605,23 @@ class Log(val dir: File, lock synchronized { val segmentsToDelete = logSegments.toList segmentsToDelete.foreach(deleteSegment(_)) - addSegment(new LogSegment(dir, - newOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - time = time)) + + if(Os.isWindows){ + addSegment(new LogSegment(dir, + newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + time = time, + initFileSize = initFileSize)) + } + else{ + addSegment(new LogSegment(dir, + newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + time = time)) + } + this.nextOffset.set(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..ad6d619 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -54,6 +54,16 @@ class LogSegment(val log: FileMessageSet, startOffset, indexIntervalBytes, time) + + /** + * For Windows platform, add an init file size parameter when rolling new log segment, which can improve consumer read performance in NTFS file system. + */ + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, initFileSize:Long, time: Time) = + this(new FileMessageSet(file = Log.logFilename(dir, startOffset), initFileSize = initFileSize), + new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), + startOffset, + indexIntervalBytes, + time) /* Return the size in bytes of this log segment */ def size: Long = log.sizeInBytes() diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index a89b046..f6a8221 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -156,7 +156,23 @@ object Utils extends Logging { else new FileInputStream(file).getChannel() } - + + /** + * For Windows platform, pre-allocating file segment when openning channel + */ + def openChannel(file: File, mutable: Boolean, initFileSize: Long): FileChannel = { + if(mutable) + if (Os.isWindows) { + debug("create RandomAccessFile with size:" + initFileSize) + val randomAccessFile = new RandomAccessFile(file, "rw") + randomAccessFile.setLength(initFileSize) + randomAccessFile.getChannel() + } + else + new RandomAccessFile(file, "rw").getChannel() + else + new FileInputStream(file).getChannel() + } /** * Do the given action and log any exceptions thrown without rethrowing them * @param log The log method to use for logging. E.g. logger.warn