diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala index 30caec1..8b4e765 100644 --- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala +++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala @@ -16,7 +16,7 @@ */ package kafka.server -import kafka.utils.Logging +import kafka.utils.{Utils, Logging} import kafka.common._ import java.util.concurrent.locks.ReentrantLock import java.io._ @@ -36,7 +36,7 @@ object HighwaterMarkCheckpoint { class HighwaterMarkCheckpoint(val path: String) extends Logging { /* create the highwatermark file handle for all partitions */ - val name = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName + val name = path + File.separator + HighwaterMarkCheckpoint.highWatermarkFileName private val hwFile = new File(name) private val hwFileLock = new ReentrantLock() // recover from previous tmp file, if required @@ -62,8 +62,10 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { } hwFileWriter.flush() hwFileWriter.close() - // swap new high watermark file with previous one - if(!tempHwFile.renameTo(hwFile)) { + + // File rename is not guaranteed to be atomic on Windows. During a hard crash, if the high watermark file is missing, + // one has to manually rename it from the temporary high watermark file. + if(!Utils.rename(tempHwFile, hwFile)) { fatal("Attempt to swap the new high watermark file with the old one failed") System.exit(1) } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index e83eb5f..bd5a946 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -256,7 +256,22 @@ object Utils extends Logging { file.delete() } } - + + /** + * rename a file + * @param file original file + * @param newFile new file to be renamed to + * @return a boolean indicates whether it succeeds or nor + */ + def rename(file: File, newFile: File) = { + if (!file.renameTo(newFile)) { + // renameTo() fails on Windows if the destination file exists. + newFile.delete() + file.renameTo(newFile) + } else + true + } + /** * Register the given mbean with the platform mbean server, * unregistering any mbean that was there before. Note,