Index: core/src/main/scala/kafka/log/Log.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 4b03d67e106c6d3b9dd465a308a62b400ead70a4) +++ core/src/main/scala/kafka/log/Log.scala (revision e6937d4817792b2d0daf5667153e22402c4ebc07) @@ -228,6 +228,11 @@ /* A lock that guards all modifications to the log */ private val lock = new Object + + + /* A lock that guards all delete action to the log */ + private val deleteLock = new Object + // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() // After memory mapped buffer is closed, no disk IO operation should be performed for this log @volatile private var isMemoryMappedBufferClosed = false @@ -282,6 +287,7 @@ /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] + private val deletedSegments: ConcurrentNavigableMap[File, java.lang.Boolean] = new ConcurrentSkipListMap[File, java.lang.Boolean] // Visible for testing @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None @@ -513,61 +519,48 @@ */ private def removeTempFilesAndCollectSwapFiles(): Set[File] = { - def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = { - info(s"Deleting index files with suffix $suffix for baseFile $baseFile") - val offset = offsetFromFile(baseFile) - Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath) - Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath) - Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath) - } - var swapFiles = Set[File]() - var cleanFiles = Set[File]() + var cleanDirs = Set[File]() var minCleanedFileOffset = Long.MaxValue - for (file <- dir.listFiles if file.isFile) { - if (!file.canRead) - throw new IOException(s"Could not read file $file") - val filename = file.getName - if (filename.endsWith(DeletedFileSuffix)) { - debug(s"Deleting stray temporary file ${file.getAbsolutePath}") - Files.deleteIfExists(file.toPath) - } else if (filename.endsWith(CleanedFileSuffix)) { - minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) - cleanFiles += file - } else if (filename.endsWith(SwapFileSuffix)) { + for (segDir <- dir.listFiles if LogSegment.isSegmentDir(segDir)) { + val baseOffset = LogSegment.getSegmentOffset(segDir) + if (!LogSegment.canReadSegment(segDir)) + throw new IOException(s"Could not read file $segDir") + val segmentStatus = LogSegment.getStatus(segDir) + if (segmentStatus == SegmentStatus.DELETED) { + debug(s"Deleting stray temporary file ${segDir.getAbsolutePath}") + LogSegment.deleteIfExists(segDir) + } else if (segmentStatus == SegmentStatus.CLEANED) { + minCleanedFileOffset = Math.min(baseOffset, minCleanedFileOffset) + cleanDirs += segDir + } else if (segmentStatus == SegmentStatus.SWAP) { // we crashed in the middle of a swap operation, to recover: // if a log, delete the index files, complete the swap operation later // if an index just delete the index files, they will be rebuilt - val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) - info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.") - if (isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) - } else if (isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file + info(s"Found file ${segDir.getAbsolutePath} from interrupted swap operation.") + info(s"Deleting index files from ${segDir.getAbsolutePath}") + LogSegment.deleteIndicesIfExist(segDir) + if(LogSegment.isSegmentFileExists(segDir, SegmentFile.LOG)){ + swapFiles += segDir } } } - // KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap // files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment // for more details about the split operation. - val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset) - invalidSwapFiles.foreach { file => - debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset") - val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) - deleteIndicesIfExist(baseFile, SwapFileSuffix) - Files.deleteIfExists(file.toPath) + val (invalidSwapFiles, validSwapDirs) = swapFiles.partition(segDir => LogSegment.getSegmentOffset(segDir) >= minCleanedFileOffset) + invalidSwapFiles.foreach { segDir => + debug(s"Deleting invalid swap file ${segDir.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset") + LogSegment.deleteIfExists(segDir) } // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files - cleanFiles.foreach { file => - debug(s"Deleting stray .clean file ${file.getAbsolutePath}") - Files.deleteIfExists(file.toPath) + cleanDirs.foreach { segDir => + debug(s"Deleting stray .clean file ${segDir.getAbsolutePath}") + LogSegment.deleteIfExists(segDir) } - - validSwapFiles + validSwapDirs } /** @@ -580,37 +573,37 @@ private def loadSegmentFiles(): Unit = { // load segments in ascending order because transactional data from one segment may depend on the // segments that come before it - for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { - if (isIndexFile(file)) { + for (segDir <- dir.listFiles if LogSegment.isSegmentDir(segDir)) { + val baseOffset = LogSegment.getSegmentOffset(segDir) + if (!LogSegment.isSegmentFileExists(segDir, SegmentFile.LOG)) { // if it is an index file, make sure it has a corresponding .log file - val offset = offsetFromFile(file) - val logFile = Log.logFile(dir, offset) - if (!logFile.exists) { - warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.") - Files.deleteIfExists(file.toPath) - } - } else if (isLogFile(file)) { - // if it's a log file, load the corresponding log segment - val baseOffset = offsetFromFile(file) - val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists() - val segment = LogSegment.open(dir = dir, - baseOffset = baseOffset, - config, - time = time, - fileAlreadyExists = true) - - try segment.sanityCheck(timeIndexFileNewlyCreated) - catch { - case _: NoSuchFileException => - error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " + - "recovering segment and rebuilding index files...") - recoverSegment(segment) - case e: CorruptIndexException => - warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " + - s"to ${e.getMessage}}, recovering segment and rebuilding index files...") - recoverSegment(segment) - } - addSegment(segment) + warn(s"Found an orphaned index file ${segDir.getAbsolutePath}, with no corresponding log file.") + LogSegment.deleteIfExists(segDir) + } else { + val status = LogSegment.getStatus(segDir) + if(status == SegmentStatus.HOT){ + // if it's a log file, load the corresponding log segment + val timeIndexFileNewlyCreated = !LogSegment.isSegmentFileExists(segDir, SegmentFile.TIME_INDEX) + val segment = LogSegment.open(segDir = segDir, + baseOffset = baseOffset, + config, + time = time, + fileAlreadyExists = true) + try segment.sanityCheck(timeIndexFileNewlyCreated) + catch { + case _: NoSuchFileException => + error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " + + "recovering segment and rebuilding index files...") + recoverSegment(segment) + case e: CorruptIndexException => + warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " + + s"to ${e.getMessage}}, recovering segment and rebuilding index files...") + recoverSegment(segment) + } + addSegment(segment) + }else if(status == SegmentStatus.DELETED){ + deletedSegments.put(segDir, java.lang.Boolean.TRUE) + } } } } @@ -643,16 +636,14 @@ * this situation. This is expected to be an extremely rare scenario in practice, * and manual intervention might be required to get out of it. */ - private def completeSwapOperations(swapFiles: Set[File]): Unit = { - for (swapFile <- swapFiles) { - val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) - val baseOffset = offsetFromFile(logFile) - val swapSegment = LogSegment.open(swapFile.getParentFile, + private def completeSwapOperations(segDirs: Set[File]): Unit = { + for (segDir <- segDirs) { + val baseOffset = LogSegment.getSegmentOffset(segDir) + val swapSegment = LogSegment.open(segDir = segDir, baseOffset = baseOffset, config, - time = time, - fileSuffix = SwapFileSuffix) - info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.") + time = time) + info(s"Found log file ${segDir.getPath} from interrupted swap operation, repairing.") recoverSegment(swapSegment) // We create swap files for two cases: @@ -680,7 +671,7 @@ private def loadSegments(): Long = { // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations - val swapFiles = removeTempFilesAndCollectSwapFiles() + val swapDirs = removeTempFilesAndCollectSwapFiles() // Now do a second pass and load all the log and index files. // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When @@ -697,7 +688,7 @@ // Finally, complete any interrupted swap operations. To be crash-safe, // log files that are replaced by the swap segment should be renamed to .deleted // before the swap file is restored as the new segment file. - completeSwapOperations(swapFiles) + completeSwapOperations(swapDirs) if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { val nextOffset = retryOnOffsetOverflow { @@ -709,7 +700,7 @@ nextOffset } else { if (logSegments.isEmpty) { - addSegment(LogSegment.open(dir = dir, + addSegment(LogSegment.open(segDir = LogSegment.getSegmentDir(dir, 0), baseOffset = 0, config, time = time, @@ -793,7 +784,7 @@ if (logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at logStartOffset - addSegment(LogSegment.open(dir = dir, + addSegment(LogSegment.open(segDir = LogSegment.getSegmentDir(dir, logStartOffset), baseOffset = logStartOffset, config, time = time, @@ -1864,7 +1855,6 @@ lock synchronized { checkIfMemoryMappedBufferClosed() val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) - val logFile = Log.logFile(dir, newOffset) if (segments.containsKey(newOffset)) { // segment with the same base offset already exists and loaded @@ -1886,13 +1876,10 @@ s"Trying to roll a new log segment for topic partition $topicPartition with " + s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") } else { - val offsetIdxFile = offsetIndexFile(dir, newOffset) - val timeIdxFile = timeIndexFile(dir, newOffset) - val txnIdxFile = transactionIndexFile(dir, newOffset) - - for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { - warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") - Files.delete(file.toPath) + val newSegDir = new File(dir, String.valueOf(newOffset)) + if (newSegDir.exists()) { + warn(s"Newly rolled segment ${newSegDir.getAbsolutePath} already exists; deleting it first") + LogSegment.deleteIfExists(newSegDir) } Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment()) @@ -1906,7 +1893,7 @@ producerStateManager.updateMapEndOffset(newOffset) producerStateManager.takeSnapshot() - val segment = LogSegment.open(dir, + val segment = LogSegment.open(LogSegment.getSegmentDir(dir, newOffset), baseOffset = newOffset, config, time = time, @@ -2083,7 +2070,7 @@ lock synchronized { checkIfMemoryMappedBufferClosed() removeAndDeleteSegments(logSegments, asyncDelete = true) - addSegment(LogSegment.open(dir, + addSegment(LogSegment.open(LogSegment.getSegmentDir(dir, newOffset), baseOffset = newOffset, config = config, time = time, @@ -2186,8 +2173,9 @@ val toDelete = segments.toList toDelete.foreach { segment => this.segments.remove(segment.baseOffset) + segment.closeHandlers() + deleteSegment(segment.segDir, asyncDelete) } - deleteSegmentFiles(toDelete, asyncDelete) } } @@ -2201,25 +2189,29 @@ * * @throws IOException if the file can't be renamed and still exists */ - private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = { - segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix)) - + def deleteSegment(segDir: File, asyncDelete: Boolean): Unit = { def deleteSegments(): Unit = { - info(s"Deleting segments $segments") - maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { - segments.foreach(_.deleteIfExists()) + deleteLock synchronized { + deletedSegments.keySet().forEach( segDir =>{ + info(s"Deleting segment ${topicPartition.toString} : $segDir") + try { + LogSegment.deleteIfExists(segDir) + deletedSegments.remove(segDir) + }catch{ + case e: Throwable => warn(s"Unable to delete segment ${topicPartition.toString} : $segDir, Reason : ${e.getClass.getName}") + } + }) } } - - if (asyncDelete) { - info(s"Scheduling segments for deletion $segments") - scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs) - } else { - deleteSegments() + LogSegment.setStatus(segDir, SegmentStatus.DELETED) + this.deletedSegments.put(segDir, java.lang.Boolean.TRUE) + if(asyncDelete){ + scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs) + }else{ + LogSegment.deleteIfExists(segDir) } } - - /** + /** * Swap one or more new segment in place and delete one or more existing segments in a crash-safe manner. The old * segments will be asynchronously deleted. * @@ -2263,7 +2255,8 @@ // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() if (!isRecoveredSwapFile) - sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)) + sortedNewSegments.reverse.foreach(_.changeSegmentStatus(SegmentStatus.CLEANED, SegmentStatus.SWAP)) + sortedNewSegments.reverse.foreach(addSegment(_)) // delete the old files @@ -2272,10 +2265,11 @@ if (seg.baseOffset != sortedNewSegments.head.baseOffset) segments.remove(seg.baseOffset) // delete segment files - deleteSegmentFiles(List(seg), asyncDelete = true) + seg.closeHandlers() + deleteSegment(seg.segDir, asyncDelete = true) } // okay we are safe now, remove the swap suffix - sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + sortedNewSegments.foreach(_.changeSegmentStatus(SegmentStatus.SWAP, SegmentStatus.HOT)) } } @@ -2347,7 +2341,7 @@ * @return List of new segments that replace the input segment */ private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = { - require(isLogFile(segment.log.file), s"Cannot split file ${segment.log.file.getAbsoluteFile}") + require(segment.getSegmentStatus() == SegmentStatus.HOT, s"Cannot split file ${segment.log.file.getAbsoluteFile}") require(segment.hasOverflow, "Split operation is only permitted for segments with overflow") info(s"Splitting overflowed segment $segment") @@ -2614,15 +2608,6 @@ new TopicPartition(topic, partition) } - - private def isIndexFile(file: File): Boolean = { - val filename = file.getName - filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix) || filename.endsWith(TxnIndexFileSuffix) - } - - private def isLogFile(file: File): Boolean = - file.getPath.endsWith(LogFileSuffix) - } object LogMetricNames { Index: core/src/main/scala/kafka/log/LogCleaner.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/LogCleaner.scala (revision 4b03d67e106c6d3b9dd465a308a62b400ead70a4) +++ core/src/main/scala/kafka/log/LogCleaner.scala (revision e6937d4817792b2d0daf5667153e22402c4ebc07) @@ -432,9 +432,14 @@ } def createNewCleanedSegment(log: Log, baseOffset: Long): LogSegment = { - LogSegment.deleteIfExists(log.dir, baseOffset, fileSuffix = Log.CleanedFileSuffix) - LogSegment.open(log.dir, baseOffset, log.config, Time.SYSTEM, fileAlreadyExists = false, - fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate) + val segDir = LogSegment.getSegmentDir(log.dir, baseOffset, randomDigits = true) + val tobeRemoved = LogSegment.getCleanedSegmentFiles(log.dir, baseOffset) + if(tobeRemoved != null){ + tobeRemoved.foreach( segDir => log.deleteSegment(segDir, asyncDelete = true)) + } + val segment = LogSegment.open(segDir, baseOffset, log.config, Time.SYSTEM, fileAlreadyExists = false, + initFileSize = log.initFileSize, preallocate = log.config.preallocate, segmentStatus = SegmentStatus.CLEANED) + segment } } Index: core/src/main/scala/kafka/log/LogSegment.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/LogSegment.scala (revision 4b03d67e106c6d3b9dd465a308a62b400ead70a4) +++ core/src/main/scala/kafka/log/LogSegment.scala (revision d6a4582a34b3f6904c3d5965f977a9ab4bdf6e76) @@ -17,11 +17,13 @@ package kafka.log import java.io.{File, IOException} -import java.nio.file.{Files, NoSuchFileException} +import java.nio.file.{Files, NoSuchFileException, Path} import java.nio.file.attribute.FileTime +import java.util +import java.util.Comparator import java.util.concurrent.TimeUnit -import kafka.common.LogSegmentOffsetOverflowException +import kafka.common.{KafkaException, LogSegmentOffsetOverflowException} import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochFileCache import kafka.server.{FetchDataInfo, LogOffsetMetadata} @@ -34,6 +36,7 @@ import scala.collection.JavaConverters._ import scala.math._ +import scala.util.Random /** * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing @@ -60,12 +63,14 @@ val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, - val time: Time) extends Logging { + val time: Time, dir: File) extends Logging { def offsetIndex: OffsetIndex = lazyOffsetIndex.get def timeIndex: TimeIndex = lazyTimeIndex.get + def segDir: File = dir + def shouldRoll(rollParams: RollParams): Boolean = { val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs size > rollParams.maxSegmentBytes - rollParams.messagesSize || @@ -487,6 +492,37 @@ txnIndex.file = new File(dir, txnIndex.file.getName) } + /** + * Get the status for the for this log segment + * IOException from this method should be handled by the caller + */ + def getSegmentStatus(): SegmentStatus = { + SegmentStatusHandler.getStatus(new File(segDir, SegmentFile.STATUS.getName)) + } + + + /** + * Change the status for the for this log segment + * IOException from this method should be handled by the caller + */ + def changeSegmentStatus(segmentStatus: SegmentStatus): Unit = { + changeSegmentStatus(SegmentStatus.HOT, segmentStatus) + } + + /** + * Change the status for the for this log segment + * IOException from this method should be handled by the caller + */ + def changeSegmentStatus(oldStatus: SegmentStatus, newStatus: SegmentStatus): Unit = { + val file = new File(segDir, SegmentFile.STATUS.getName) + val segStatus = SegmentStatusHandler.getStatus(file) + if(segStatus == oldStatus){ + SegmentStatusHandler.setStatus(file, newStatus) + }else{ + throw new KafkaException(s"Invalid Segment Status $segStatus, expected $oldStatus") + } + } + /** * Change the suffix for the index and log file for this log segment * IOException from this method should be handled by the caller @@ -650,26 +686,113 @@ object LogSegment { - def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, - initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = { + private val random = scala.util.Random + def open(segDir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, + initFileSize: Int = 0, preallocate: Boolean = false, segmentStatus: SegmentStatus = SegmentStatus.HOT): LogSegment = { val maxIndexSize = config.maxIndexSize + if(!fileAlreadyExists){ + segDir.mkdirs() + SegmentStatusHandler.setStatus(new File(segDir, SegmentFile.STATUS.getName), segmentStatus) + } new LogSegment( - FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), - LazyIndex.forOffset(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), - LazyIndex.forTime(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), - new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)), + FileRecords.open(new File(segDir, SegmentFile.LOG.getName), fileAlreadyExists, initFileSize, preallocate), + LazyIndex.forOffset(new File(segDir, SegmentFile.OFFSET_INDEX.getName), baseOffset = baseOffset, maxIndexSize = maxIndexSize), + LazyIndex.forTime(new File(segDir, SegmentFile.TIME_INDEX.getName), baseOffset = baseOffset, maxIndexSize = maxIndexSize), + new TransactionIndex(baseOffset, new File(segDir, SegmentFile.TXN_INDEX.getName)), baseOffset, indexIntervalBytes = config.indexInterval, rollJitterMs = config.randomSegmentJitter, - time) + time, segDir) + } + + def isSegmentDir(file: File): Boolean = { + file.isDirectory && !file.getName.equals("snapshot") + } + + def getCleanedSegmentFiles(logDir: File, offset: Long): Array[File] = { + val fNameDir = String.valueOf(offset) + val fNameDyn = fNameDir+"-" + logDir.listFiles( file => (file.getName.startsWith(fNameDyn) || file.getName.equals(fNameDir)) && getStatus(file) == SegmentStatus.CLEANED) + } + + def getSegmentDir(logDir: File, baseOffset: Long, randomDigits: Boolean = false): File = { + new File(logDir, if(randomDigits) baseOffset+"-"+random.nextInt(100000) else String.valueOf(baseOffset)) + } + + def getSegmentOffset(segDir: File): Long ={ + val name = segDir.getName + val index = name.indexOf("-") + if(index > -1){ + name.substring(0, index).toLong + }else{ + name.toLong + } + } + + def deleteIfExists(segDir: File): Unit = { + if(segDir.exists()){ + val files = segDir.listFiles() + if(files != null){ + files.foreach( file => Files.deleteIfExists(file.toPath)) + } + Files.deleteIfExists(segDir.toPath) + } + } + + def deleteIndicesIfExist(segDir: File): Unit = { + Files.deleteIfExists(new File(segDir, SegmentFile.OFFSET_INDEX.getName).toPath) + Files.deleteIfExists(new File(segDir, SegmentFile.TIME_INDEX.getName).toPath) + Files.deleteIfExists(new File(segDir, SegmentFile.TXN_INDEX.getName).toPath) + } + + def getSnapshotDir(logDir: File): File = { + new File(logDir, "snapshot") + } + + def getSnapshotFile(logDir: File, baseOffset: Long): File = { + val segDir = new File(logDir, "snapshot") + new File(segDir, String.valueOf(baseOffset)) } - def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = { - Log.deleteFileIfExists(Log.offsetIndexFile(dir, baseOffset, fileSuffix)) - Log.deleteFileIfExists(Log.timeIndexFile(dir, baseOffset, fileSuffix)) - Log.deleteFileIfExists(Log.transactionIndexFile(dir, baseOffset, fileSuffix)) - Log.deleteFileIfExists(Log.logFile(dir, baseOffset, fileSuffix)) + def getSnapshotOffset(snapshot : File): Long = { + snapshot.getName.toLong } + + def getStatus(segDir: File): SegmentStatus = { + val statusFile = new File(segDir, SegmentFile.STATUS.getName) + if(statusFile.exists()){ + SegmentStatusHandler.getStatus(statusFile) + }else{ + SegmentStatus.UNKNOWN; + } + } + + def setStatus(segDir: File, status: SegmentStatus): Unit = { + val statusFile = new File(segDir, SegmentFile.STATUS.getName) + SegmentStatusHandler.setStatus(statusFile, status) + } + + + def isSegmentFileExists(dir: File, baseOffset: Long, segmentFile: SegmentFile): Boolean = { + val segDir = new File(dir, String.valueOf(baseOffset)) + new File(segDir, segmentFile.getName).exists + } + def isSegmentFileExists(segDir: File, segmentFile: SegmentFile): Boolean = { + new File(segDir, segmentFile.getName).exists + } + + def canReadSegment(segDir: File): Boolean = { + val files = segDir.listFiles + var status = true + if(files != null){ + for (file <- files if file.isFile){ + status = status && file.canRead + } + } + status + } + + } object LogFlushStats extends KafkaMetricsGroup { Index: core/src/main/scala/kafka/log/ProducerStateManager.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/ProducerStateManager.scala (revision 4b03d67e106c6d3b9dd465a308a62b400ead70a4) +++ core/src/main/scala/kafka/log/ProducerStateManager.scala (revision 4259718d6ba5edbeccf9176ef27ef1b00c5ad41d) @@ -439,13 +439,12 @@ finally fileChannel.close() } - private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix) - // visible for testing private[log] def listSnapshotFiles(dir: File): Seq[File] = { - if (dir.exists && dir.isDirectory) { - Option(dir.listFiles).map { files => - files.filter(f => f.isFile && isSnapshotFile(f)).toSeq + val snapshotDir = LogSegment.getSnapshotDir(dir) + if (snapshotDir.exists && snapshotDir.isDirectory) { + Option(snapshotDir.listFiles).map { files => + files.filter(f => f.isFile).toSeq }.getOrElse(Seq.empty) } else Seq.empty } @@ -454,7 +453,7 @@ private[log] def deleteSnapshotsBefore(dir: File, offset: Long): Unit = deleteSnapshotFiles(dir, _ < offset) private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => true): Unit = { - listSnapshotFiles(dir).filter(file => predicate(offsetFromFile(file))).foreach { file => + listSnapshotFiles(dir).filter(file => predicate(LogSegment.getSegmentOffset(file))).foreach { file => Files.deleteIfExists(file.toPath) } } @@ -548,7 +547,7 @@ info(s"Loading producer state from snapshot file '$file'") val loadedProducers = readSnapshot(file).filter { producerEntry => !isProducerExpired(currentTime, producerEntry) } loadedProducers.foreach(loadProducerEntry) - lastSnapOffset = offsetFromFile(file) + lastSnapOffset = offsetFromSnapshotFile(file) lastMapOffset = lastSnapOffset return } catch { @@ -655,7 +654,11 @@ def takeSnapshot(): Unit = { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { - val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset) + val snapshotFile = LogSegment.getSnapshotFile(logDir, lastMapOffset) + val parentDir = snapshotFile.getParentFile + if(!parentDir.exists()){ + parentDir.mkdirs() + } info(s"Writing producer snapshot at offset $lastMapOffset") writeSnapshot(snapshotFile, producers) @@ -667,12 +670,12 @@ /** * Get the last offset (exclusive) of the latest snapshot file. */ - def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFile(file)) + def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromSnapshotFile(file)) /** * Get the last offset (exclusive) of the oldest snapshot file. */ - def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) + def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromSnapshotFile(file)) /** * When we remove the head of the log due to retention, we need to remove snapshots older than @@ -740,7 +743,7 @@ private def oldestSnapshotFile: Option[File] = { val files = listSnapshotFiles if (files.nonEmpty) - Some(files.minBy(offsetFromFile)) + Some(files.minBy(offsetFromSnapshotFile)) else None } @@ -748,11 +751,15 @@ private def latestSnapshotFile: Option[File] = { val files = listSnapshotFiles if (files.nonEmpty) - Some(files.maxBy(offsetFromFile)) + Some(files.maxBy(offsetFromSnapshotFile)) else None } private def listSnapshotFiles: Seq[File] = ProducerStateManager.listSnapshotFiles(logDir) + def offsetFromSnapshotFile(file: File): Long = { + LogSegment.getSnapshotOffset(file) + } + } Index: core/src/main/scala/kafka/log/SegmentFile.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/SegmentFile.java (revision 317549c83e7f66f1cabd0c6fdf76dc84ab4d58e1) +++ core/src/main/scala/kafka/log/SegmentFile.java (revision 317549c83e7f66f1cabd0c6fdf76dc84ab4d58e1) @@ -0,0 +1,18 @@ +package kafka.log; + +import java.util.Objects; + +public enum SegmentFile { + + LOG("log"), OFFSET_INDEX("index"), TIME_INDEX("timeindex"), + TXN_INDEX("txnindex"), STATUS("status"); + + private final String name; + SegmentFile(String name) { + this.name = Objects.requireNonNull(name); + } + + public String getName() { + return name; + } +} Index: core/src/main/scala/kafka/log/SegmentStatus.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/SegmentStatus.java (revision e6937d4817792b2d0daf5667153e22402c4ebc07) +++ core/src/main/scala/kafka/log/SegmentStatus.java (revision e6937d4817792b2d0daf5667153e22402c4ebc07) @@ -0,0 +1,31 @@ +package kafka.log; + +public enum SegmentStatus { + HOT(1), DELETED(2), CLEANED(3), SWAP(4), UNKNOWN(5); + + private final int status; + SegmentStatus(int status) { + this.status = status; + } + + public int getStatus() { + return status; + } + + public static SegmentStatus getStatus(int id){ + switch (id){ + case 1: + return HOT; + case 2: + return DELETED; + case 3: + return CLEANED; + case 4: + return SWAP; + case 5: + return UNKNOWN; + default: + throw new RuntimeException("Invalid file status : "+id); + } + } +} Index: core/src/main/scala/kafka/log/SegmentStatusHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/SegmentStatusHandler.java (revision 7110b943f2852da62e7e079e1e00ea65c874e821) +++ core/src/main/scala/kafka/log/SegmentStatusHandler.java (revision 7110b943f2852da62e7e079e1e00ea65c874e821) @@ -0,0 +1,23 @@ +package kafka.log; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; + +public class SegmentStatusHandler { + + public static SegmentStatus getStatus(File file) throws IOException { + if(!file.exists()) { + return SegmentStatus.UNKNOWN; + }else{ + final byte[] data = Files.readAllBytes(file.toPath()); + return SegmentStatus.getStatus(ByteBuffer.wrap(data).getInt()); + } + } + + public static void setStatus(File file, SegmentStatus status) throws IOException { + Files.write(file.toPath(), ByteBuffer.allocate(Integer.BYTES).putInt(status.getStatus()).array()); + } + +} Index: core/src/test/scala/unit/kafka/log/LogUtils.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/log/LogUtils.scala (revision 4b03d67e106c6d3b9dd465a308a62b400ead70a4) +++ core/src/test/scala/unit/kafka/log/LogUtils.scala (revision 46d0ebdcb27c17909f65fe7819fe1f4929de596c) @@ -30,11 +30,15 @@ logDir: File, indexIntervalBytes: Int = 10, time: Time = Time.SYSTEM): LogSegment = { - val ms = FileRecords.open(Log.logFile(logDir, offset)) - val idx = LazyIndex.forOffset(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000) - val timeIdx = LazyIndex.forTime(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) - val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset)) + val segDir = new File(logDir, String.valueOf(offset)) + segDir.mkdirs() + val statusFile = new File(segDir, SegmentFile.STATUS.getName) + SegmentStatusHandler.setStatus(statusFile, SegmentStatus.HOT) - new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time) + val ms = FileRecords.open(new File(segDir, SegmentFile.LOG.getName)) + val idx = LazyIndex.forOffset(new File(segDir, SegmentFile.OFFSET_INDEX.getName), offset, maxIndexSize = 1000) + val timeIdx = LazyIndex.forTime(new File(segDir, SegmentFile.TIME_INDEX.getName), offset, maxIndexSize = 1500) + val txnIndex = new TransactionIndex(offset, new File(segDir, SegmentFile.TXN_INDEX.getName)) + new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time, statusFile) } } Index: core/src/test/scala/unit/kafka/log/LogTest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/log/LogTest.scala (revision 46d0ebdcb27c17909f65fe7819fe1f4929de596c) +++ core/src/test/scala/unit/kafka/log/LogTest.scala (revision 55162c342efeed2febf90633e46851ea92f4fdc0) @@ -727,7 +727,7 @@ override def addSegment(segment: LogSegment): LogSegment = { val wrapper = new LogSegment(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, segment.txnIndex, segment.baseOffset, - segment.indexIntervalBytes, segment.rollJitterMs, mockTime) { + segment.indexIntervalBytes, segment.rollJitterMs, mockTime, null) { override def read(startOffset: Long, maxSize: Int, maxPosition: Long, minOneMessage: Boolean): FetchDataInfo = { segmentsWithReads += this