diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e4be8fc..959f3ec 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -58,19 +58,19 @@ object LogAppendInfo { /** * Struct to hold various quantities we compute about each message set before appending to the log * - * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending - * to the follower. - * @param lastOffset The last offset in the message set - * @param maxTimestamp The maximum timestamp of the message set. - * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. - * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp - * @param logStartOffset The start offset of the log at the time of this append. - * @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false` - * @param sourceCodec The source codec used in the message set (send by the producer) - * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any) - * @param shallowCount The number of shallow messages - * @param validBytes The number of valid bytes - * @param offsetsMonotonic Are the offsets in this message set monotonically increasing + * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending + * to the follower. + * @param lastOffset The last offset in the message set + * @param maxTimestamp The maximum timestamp of the message set. + * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. + * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param logStartOffset The start offset of the log at the time of this append. + * @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param sourceCodec The source codec used in the message set (send by the producer) + * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any) + * @param shallowCount The number of shallow messages + * @param validBytes The number of valid bytes + * @param offsetsMonotonic Are the offsets in this message set monotonically increasing * @param lastOffsetOfFirstBatch The last offset of the first batch */ case class LogAppendInfo(var firstOffset: Option[Long], @@ -96,6 +96,7 @@ case class LogAppendInfo(var firstOffset: Option[Long], /** * Get the (maximum) number of messages described by LogAppendInfo + * * @return Maximum possible number of messages described by LogAppendInfo */ def numMessages: Long = { @@ -110,11 +111,11 @@ case class LogAppendInfo(var firstOffset: Option[Long], * A class used to hold useful metadata about a completed transaction. This is used to build * the transaction index after appending to the log. * - * @param producerId The ID of the producer + * @param producerId The ID of the producer * @param firstOffset The first offset (inclusive) of the transaction - * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the - * COMMIT/ABORT control record which indicates the transaction's completion. - * @param isAborted Whether or not the transaction was aborted + * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the + * COMMIT/ABORT control record which indicates the transaction's completion. + * @param isAborted Whether or not the transaction was aborted */ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) { override def toString: String = { @@ -134,24 +135,24 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i * New log segments are created according to a configurable policy that controls the size in bytes or time interval * for a given segment. * - * @param dir The directory in which log segments are created. - * @param config The log configuration settings - * @param logStartOffset The earliest offset allowed to be exposed to kafka client. - * The logStartOffset can be updated by : + * @param dir The directory in which log segments are created. + * @param config The log configuration settings + * @param logStartOffset The earliest offset allowed to be exposed to kafka client. + * The logStartOffset can be updated by : * - user's DeleteRecordsRequest * - broker's log retention * - broker's log truncation - * The logStartOffset is used to decide the following: + * The logStartOffset is used to decide the following: * - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted. - * It may trigger log rolling if the active segment is deleted. + * It may trigger log rolling if the active segment is deleted. * - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset, - * we make sure that logStartOffset <= log's highWatermark - * Other activities such as log cleaning are not affected by logStartOffset. - * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk - * @param scheduler The thread pool scheduler used for background actions - * @param brokerTopicStats Container for Broker Topic Yammer Metrics - * @param time The time instance used for checking the clock - * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired + * we make sure that logStartOffset <= log's highWatermark + * Other activities such as log cleaning are not affected by logStartOffset. + * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk + * @param scheduler The thread pool scheduler used for background actions + * @param brokerTopicStats Container for Broker Topic Yammer Metrics + * @param time The time instance used for checking the clock + * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired */ @threadsafe @@ -174,6 +175,10 @@ class Log(@volatile var dir: File, /* A lock that guards all modifications to the log */ private val lock = new Object + + /* A lock that guards all delete action to the log */ + private val deleteLock = new Object + // The memory mapped buffer for index files of this log will be closed for index files of this log will be closed with either delete() or closeHandlers() // After memory mapped buffer is closed, no disk IO operation should be performed for this log @volatile private var isMemoryMappedBufferClosed = false @@ -191,7 +196,7 @@ class Log(@volatile var dir: File, def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = { if ((updatedKeys.contains(LogConfig.RetentionMsProp) || updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp)) - && topicPartition.partition == 0 // generate warnings only for one partition of each topic + && topicPartition.partition == 0 // generate warnings only for one partition of each topic && newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs) warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is set to ${newConfig.retentionMs}. It is smaller than " + s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${newConfig.messageTimestampDifferenceMaxMs}. " + @@ -228,6 +233,7 @@ class Log(@volatile var dir: File, /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] + private val deletedSegments: ConcurrentNavigableMap[File, java.lang.Boolean] = new ConcurrentSkipListMap[File, java.lang.Boolean] @volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache() @@ -292,7 +298,7 @@ class Log(@volatile var dir: File, }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) /** The name of this log */ - def name = dir.getName() + def name = dir.getName() def leaderEpochCache = _leaderEpochCache @@ -308,43 +314,35 @@ class Log(@volatile var dir: File, * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than * the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted * by this method. + * * @return Set of .swap files that are valid to be swapped in as segment files */ private def removeTempFilesAndCollectSwapFiles(): Set[File] = { - def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = { - info(s"Deleting index files with suffix $suffix for baseFile $baseFile") - val offset = offsetFromFile(baseFile) - Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath) - Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath) - Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath) - } - var swapFiles = Set[File]() - var cleanFiles = Set[File]() + var cleanDirs = Set[File]() var minCleanedFileOffset = Long.MaxValue - for (file <- dir.listFiles if file.isFile) { - if (!file.canRead) - throw new IOException(s"Could not read file $file") - val filename = file.getName - if (filename.endsWith(DeletedFileSuffix)) { - debug(s"Deleting stray temporary file ${file.getAbsolutePath}") - Files.deleteIfExists(file.toPath) - } else if (filename.endsWith(CleanedFileSuffix)) { - minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) - cleanFiles += file - } else if (filename.endsWith(SwapFileSuffix)) { + for (segDir <- dir.listFiles if LogSegment.isSegmentDir(segDir)) { + val baseOffset = LogSegment.getSegmentOffset(segDir) + if (!LogSegment.canReadSegment(segDir)) + throw new IOException(s"Could not read file $segDir") + val segmentStatus = LogSegment.getStatus(segDir) + if (segmentStatus == SegmentStatus.DELETED) { + debug(s"Deleting stray temporary file ${segDir.getAbsolutePath}") + LogSegment.deleteIfExists(segDir) + } else if (segmentStatus == SegmentStatus.CLEANED) { + minCleanedFileOffset = Math.min(baseOffset, minCleanedFileOffset) + cleanDirs += segDir + } else if (segmentStatus == SegmentStatus.SWAP) { // we crashed in the middle of a swap operation, to recover: // if a log, delete the index files, complete the swap operation later // if an index just delete the index files, they will be rebuilt - val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) - info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.") - if (isIndexFile(baseFile)) { - deleteIndicesIfExist(baseFile) - } else if (isLogFile(baseFile)) { - deleteIndicesIfExist(baseFile) - swapFiles += file + info(s"Found file ${segDir.getAbsolutePath} from interrupted swap operation.") + info(s"Deleting index files from ${segDir.getAbsolutePath}") + LogSegment.deleteIndicesIfExist(segDir) + if (Log.logFile(segDir,baseOffset,"").exists()) { + swapFiles += segDir } } } @@ -352,21 +350,19 @@ class Log(@volatile var dir: File, // KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap // files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment // for more details about the split operation. - val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset) - invalidSwapFiles.foreach { file => - debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset") - val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) - deleteIndicesIfExist(baseFile, SwapFileSuffix) - Files.deleteIfExists(file.toPath) + val (invalidSwapFiles, validSwapDirs) = swapFiles.partition(segDir => LogSegment.getSegmentOffset(segDir) >= minCleanedFileOffset) + invalidSwapFiles.foreach { segDir => + debug(s"Deleting invalid swap file ${segDir.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset") + LogSegment.deleteIfExists(segDir) } // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files - cleanFiles.foreach { file => - debug(s"Deleting stray .clean file ${file.getAbsolutePath}") - Files.deleteIfExists(file.toPath) + cleanDirs.foreach { segDir => + debug(s"Deleting stray .clean file ${segDir.getAbsolutePath}") + LogSegment.deleteIfExists(segDir) } - validSwapFiles + validSwapDirs } /** @@ -374,49 +370,52 @@ class Log(@volatile var dir: File, * It is possible that we encounter a segment with index offset overflow in which case the LogSegmentOffsetOverflowException * will be thrown. Note that any segments that were opened before we encountered the exception will remain open and the * caller is responsible for closing them appropriately, if needed. + * * @throws LogSegmentOffsetOverflowException if the log directory contains a segment with messages that overflow the index offset */ private def loadSegmentFiles(): Unit = { // load segments in ascending order because transactional data from one segment may depend on the // segments that come before it - for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { - if (isIndexFile(file)) { + for (segDir <- dir.listFiles if LogSegment.isSegmentDir(segDir)) { + val baseOffset = LogSegment.getSegmentOffset(segDir) + val logFile = Log.logFile(segDir, baseOffset) + if (!logFile.exists) { // if it is an index file, make sure it has a corresponding .log file - val offset = offsetFromFile(file) - val logFile = Log.logFile(dir, offset) - if (!logFile.exists) { - warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.") - Files.deleteIfExists(file.toPath) - } - } else if (isLogFile(file)) { - // if it's a log file, load the corresponding log segment - val baseOffset = offsetFromFile(file) - val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists() - val segment = LogSegment.open(dir = dir, - baseOffset = baseOffset, - config, - time = time, - fileAlreadyExists = true) - - try segment.sanityCheck(timeIndexFileNewlyCreated) - catch { - case _: NoSuchFileException => - error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " + - "recovering segment and rebuilding index files...") - recoverSegment(segment) - case e: CorruptIndexException => - warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " + - s"to ${e.getMessage}}, recovering segment and rebuilding index files...") - recoverSegment(segment) + warn(s"Found an orphaned index file ${segDir.getAbsolutePath}, with no corresponding log file.") + LogSegment.deleteIfExists(segDir) + } else { + val status = LogSegment.getStatus(segDir) + if (status == SegmentStatus.HOT) { + // if it's a log file, load the corresponding log segment + val timeIndexFileNewlyCreated = !Log.timeIndexFile(segDir, baseOffset).exists() + val segment = LogSegment.open(segDir = segDir, + baseOffset = baseOffset, + config, + time = time, + fileAlreadyExists = true) + try segment.sanityCheck(timeIndexFileNewlyCreated) + catch { + case _: NoSuchFileException => + error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " + + "recovering segment and rebuilding index files...") + recoverSegment(segment) + case e: CorruptIndexException => + warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " + + s"to ${e.getMessage}}, recovering segment and rebuilding index files...") + recoverSegment(segment) + } + addSegment(segment) + } else if (status == SegmentStatus.DELETED) { + deletedSegments.put(segDir, java.lang.Boolean.TRUE) } - addSegment(segment) } } } /** * Recover the given segment. - * @param segment Segment to recover + * + * @param segment Segment to recover * @param leaderEpochCache Optional cache for updating the leader epoch during recovery * @return The number of bytes truncated from the segment * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow @@ -435,6 +434,7 @@ class Log(@volatile var dir: File, /** * This method does not need to convert IOException to KafkaStorageException because it is only called before all logs * are loaded. + * * @throws LogSegmentOffsetOverflowException if the swap file contains messages that cause the log segment offset to * overflow. Note that this is currently a fatal exception as we do not have * a way to deal with it. The exception is propagated all the way up to @@ -442,16 +442,14 @@ class Log(@volatile var dir: File, * this situation. This is expected to be an extremely rare scenario in practice, * and manual intervention might be required to get out of it. */ - private def completeSwapOperations(swapFiles: Set[File]): Unit = { - for (swapFile <- swapFiles) { - val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) - val baseOffset = offsetFromFile(logFile) - val swapSegment = LogSegment.open(swapFile.getParentFile, + private def completeSwapOperations(segDirs: Set[File]): Unit = { + for (segDir <- segDirs) { + val baseOffset = LogSegment.getSegmentOffset(segDir) + val swapSegment = LogSegment.open(segDir = segDir, baseOffset = baseOffset, config, - time = time, - fileSuffix = SwapFileSuffix) - info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.") + time = time) + info(s"Found log file ${segDir.getPath} from interrupted swap operation, repairing.") recoverSegment(swapSegment) // We create swap files for two cases: @@ -473,13 +471,14 @@ class Log(@volatile var dir: File, * Load the log segments from the log files on disk and return the next offset. * This method does not need to convert IOException to KafkaStorageException because it is only called before all logs * are loaded. + * * @throws LogSegmentOffsetOverflowException if we encounter a .swap file with messages that overflow index offset; or when * we find an unexpected number of .log files with overflow */ private def loadSegments(): Long = { // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations - val swapFiles = removeTempFilesAndCollectSwapFiles() + val swapDirs = removeTempFilesAndCollectSwapFiles() // Now do a second pass and load all the log and index files. // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When @@ -496,11 +495,11 @@ class Log(@volatile var dir: File, // Finally, complete any interrupted swap operations. To be crash-safe, // log files that are replaced by the swap segment should be renamed to .deleted // before the swap file is restored as the new segment file. - completeSwapOperations(swapFiles) + completeSwapOperations(swapDirs) if (logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 - addSegment(LogSegment.open(dir = dir, + addSegment(LogSegment.open(segDir = LogSegment.getSegmentDir(dir, logStartOffset), baseOffset = 0, config, time = time, @@ -508,7 +507,7 @@ class Log(@volatile var dir: File, initFileSize = this.initFileSize, preallocate = config.preallocate)) 0 - } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { + } else if (!(LogSegment.getStatus(dir) == SegmentStatus.DELETED)) { val nextOffset = retryOnOffsetOverflow { recoverLog() } @@ -527,6 +526,7 @@ class Log(@volatile var dir: File, * Recover the log segments and return the next offset after recovery. * This method does not need to convert IOException to KafkaStorageException because it is only called before all * logs are loaded. + * * @throws LogSegmentOffsetOverflowException if we encountered a legacy segment with offset overflow */ private def recoverLog(): Long = { @@ -588,7 +588,7 @@ class Log(@volatile var dir: File, // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state // from the first segment. if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || - (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { + (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the // last two segments and the last offset. This should avoid the full scan in the case that the log needs // truncation. @@ -713,7 +713,7 @@ class Log(@volatile var dir: File, /** * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs * - * @param records The records to append + * @param records The records to append * @param isFromClient Whether or not this append is from a producer * @throws KafkaStorageException If the append fails due to an I/O error. * @return Information about the appended messages including the first and last offset. @@ -739,12 +739,12 @@ class Log(@volatile var dir: File, * This method will generally be responsible for assigning offsets to the messages, * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. * - * @param records The log records to append - * @param isFromClient Whether or not this append is from a producer + * @param records The log records to append + * @param isFromClient Whether or not this append is from a producer * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given - * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader - * @throws KafkaStorageException If the append fails due to an I/O error. - * @throws OffsetsOutOfOrderException If out of order offsets found in 'records' + * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader + * @throws KafkaStorageException If the append fails due to an I/O error. + * @throws OffsetsOutOfOrderException If out of order offsets found in 'records' * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset * @return Information about the appended messages including the first and last offset. */ @@ -810,7 +810,7 @@ class Log(@volatile var dir: File, // we are taking the offsets we are given if (!appendInfo.offsetsMonotonic) throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + - records.records.asScala.map(_.offset)) + records.records.asScala.map(_.offset)) if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) { // we may still be able to recover if the log is empty @@ -824,9 +824,9 @@ class Log(@volatile var dir: File, val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch" throw new UnexpectedAppendOffsetException( s"Unexpected offset in append to $topicPartition. $firstOrLast " + - s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + - s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + - s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", + s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + + s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + + s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", firstOffset, appendInfo.lastOffset) } } @@ -917,7 +917,7 @@ class Log(@volatile var dir: File, case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset => val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset) val segment = segments.floorEntry(offset).getValue - val position = segment.translateOffset(offset) + val position = segment.translateOffset(offset) Some(LogOffsetMetadata(offset, segment.baseOffset, position.position)) case other => other } @@ -1069,7 +1069,7 @@ class Log(@volatile var dir: File, * Trim any invalid bytes from the end of this message set (if there are any) * * @param records The records to trim - * @param info The general information of the message set + * @param info The general information of the message set * @return A trimmed message set. This may be the same as what was passed in or it may not. */ private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = { @@ -1095,10 +1095,10 @@ class Log(@volatile var dir: File, /** * Read messages from the log. * - * @param startOffset The offset to begin reading at - * @param maxLength The maximum number of bytes to read - * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) - * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) + * @param startOffset The offset to begin reading at + * @param maxLength The maximum number of bytes to read + * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) + * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) * @param isolationLevel The isolation level of the fetcher. The READ_UNCOMMITTED isolation level has the traditional * read semantics (e.g. consumers are limited to fetching up to the high watermark). In * READ_COMMITTED, consumers are limited to fetching up to the last stable offset. Additionally, @@ -1149,9 +1149,9 @@ class Log(@volatile var dir: File, // Check the segment again in case a new segment has just rolled out. if (segmentEntry != segments.lastEntry) // New log segment has rolled out, we can read up to the file end. - segment.size + segment.size else - exposedPos + exposedPos } else { segment.size } @@ -1177,7 +1177,9 @@ class Log(@volatile var dir: File, private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { val segmentEntry = segments.floorEntry(startOffset) val allAbortedTxns = ListBuffer.empty[AbortedTxn] + def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns + collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntry, accumulator) allAbortedTxns.toList } @@ -1196,7 +1198,9 @@ class Log(@volatile var dir: File, } val abortedTransactions = ListBuffer.empty[AbortedTransaction] + def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) + collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry, accumulator) FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, @@ -1371,6 +1375,7 @@ class Log(@volatile var dir: File, private def deleteRetentionSizeBreachedSegments(): Int = { if (config.retentionSize < 0 || size < config.retentionSize) return 0 var diff = size - config.retentionSize + def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (diff - segment.size >= 0) { diff -= segment.size @@ -1390,7 +1395,9 @@ class Log(@volatile var dir: File, deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") } - def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) +// def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) + + def isFuture: Boolean = LogSegment.getStatus(dir) == SegmentStatus.FUTURE /** * The size of the log in bytes @@ -1411,14 +1418,14 @@ class Log(@volatile var dir: File, * Roll the log over to a new empty log segment if necessary. * * @param messagesSize The messages set size in bytes. - * @param appendInfo log append information - * logSegment will be rolled if one of the following conditions met - *
    - *
  1. The logSegment is full - *
  2. The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if - * the first message does not have a timestamp) - *
  3. The index is full - *
+ * @param appendInfo log append information + * logSegment will be rolled if one of the following conditions met + *
    + *
  1. The logSegment is full + *
  2. The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if + * the first message does not have a timestamp) + *
  3. The index is full + *
* @return The currently active segment after (perhaps) rolling to a new segment */ private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = { @@ -1467,13 +1474,10 @@ class Log(@volatile var dir: File, lock synchronized { checkIfMemoryMappedBufferClosed() val newOffset = math.max(expectedNextOffset, logEndOffset) - val logFile = Log.logFile(dir, newOffset) - val offsetIdxFile = offsetIndexFile(dir, newOffset) - val timeIdxFile = timeIndexFile(dir, newOffset) - val txnIdxFile = transactionIndexFile(dir, newOffset) - for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { - warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") - Files.delete(file.toPath) + val newSegDir = new File(dir, String.valueOf(newOffset)) + if (newSegDir.exists()) { + warn(s"Newly rolled segment ${newSegDir.getAbsolutePath} already exists; deleting it first") + LogSegment.deleteIfExists(newSegDir) } Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment()) @@ -1486,7 +1490,7 @@ class Log(@volatile var dir: File, producerStateManager.updateMapEndOffset(newOffset) producerStateManager.takeSnapshot() - val segment = LogSegment.open(dir, + val segment = LogSegment.open(LogSegment.getSegmentDir(dir, newOffset), baseOffset = newOffset, config, time = time, @@ -1525,7 +1529,7 @@ class Log(@volatile var dir: File, * * @param offset The offset to flush up to (non-inclusive); the new recovery point */ - def flush(offset: Long) : Unit = { + def flush(offset: Long): Unit = { maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") { if (offset <= this.recoveryPoint) return @@ -1655,9 +1659,9 @@ class Log(@volatile var dir: File, } /** - * Delete all data in the log and start at the new offset + * Delete all data in the log and start at the new offset * - * @param newOffset The new offset to start the log with + * @param newOffset The new offset to start the log with */ private[log] def truncateFullyAndStartAt(newOffset: Long) { maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") { @@ -1666,7 +1670,7 @@ class Log(@volatile var dir: File, checkIfMemoryMappedBufferClosed() val segmentsToDelete = logSegments.toList segmentsToDelete.foreach(deleteSegment) - addSegment(LogSegment.open(dir, + addSegment(LogSegment.open(LogSegment.getSegmentDir(dir, newOffset), baseOffset = newOffset, config = config, time = time, @@ -1719,9 +1723,9 @@ class Log(@volatile var dir: File, /** * This method performs an asynchronous log segment delete by doing the following: *
    - *
  1. It removes the segment from the segment map so that it will no longer be used for reads. - *
  2. It renames the index and log files by appending .deleted to the respective file name - *
  3. It schedules an asynchronous delete operation to occur in the future + *
  4. It removes the segment from the segment map so that it will no longer be used for reads. + *
  5. It renames the index and log files by appending .deleted to the respective file name + *
  6. It schedules an asynchronous delete operation to occur in the future *
* This allows reads to happen concurrently without synchronization and without the possibility of physically * deleting a file while it is being read from. @@ -1735,7 +1739,8 @@ class Log(@volatile var dir: File, info(s"Scheduling log segment [baseOffset ${segment.baseOffset}, size ${segment.size}] for deletion.") lock synchronized { segments.remove(segment.baseOffset) - asyncDeleteSegment(segment) + segment.closeHandlers() + asyncDeleteSegment(segment.segDir) } } @@ -1747,14 +1752,26 @@ class Log(@volatile var dir: File, * * @throws IOException if the file can't be renamed and still exists */ - private def asyncDeleteSegment(segment: LogSegment) { - segment.changeFileSuffixes("", Log.DeletedFileSuffix) + def asyncDeleteSegment(segDir: File) { + def deleteSeg() { - info(s"Deleting segment ${segment.baseOffset}") - maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { - segment.deleteIfExists() + deleteLock synchronized { + val deletedFiles = deletedSegments.keySet().iterator() + while (deletedFiles.hasNext) { + val delFile = deletedFiles.next() + info(s"Deleting segment ${topicPartition.toString} : $delFile") + try { + LogSegment.deleteIfExists(delFile) + deletedSegments.remove(delFile) + } catch { + case e: Throwable => warn(s"Unable to delete segment ${topicPartition.toString} : $delFile, Reason : ${e.getClass.getName}") + } + } } } + + LogSegment.setStatus(segDir, SegmentStatus.DELETED) + this.deletedSegments.put(segDir, java.lang.Boolean.TRUE) scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs) } @@ -1767,27 +1784,27 @@ class Log(@volatile var dir: File, * * The sequence of operations is: *
    - *
  1. Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments(). - * If broker crashes at this point, the clean-and-swap operation is aborted and - * the .cleaned files are deleted on recovery in loadSegments(). - *
  2. New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the - * clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in - * loadSegments(). We detect this situation by maintaining a specific order in which files are renamed from - * .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery, all .swap files - * whose offset is greater than the minimum-offset .clean file are deleted. - *
  3. If the broker crashes after all new segments were renamed to .swap, the operation is completed, the swap - * operation is resumed on recovery as described in the next step. - *
  4. Old segment files are renamed to .deleted and asynchronous delete is scheduled. - * If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). - * replaceSegments() is then invoked to complete the swap with newSegment recreated from - * the .swap file and oldSegments containing segments which were not renamed before the crash. - *
  5. Swap segment(s) are renamed to replace the existing segments, completing this operation. - * If the broker crashes, any .deleted files which may be left behind are deleted - * on recovery in loadSegments(). + *
  6. Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments(). + * If broker crashes at this point, the clean-and-swap operation is aborted and + * the .cleaned files are deleted on recovery in loadSegments(). + *
  7. New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the + * clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in + * loadSegments(). We detect this situation by maintaining a specific order in which files are renamed from + * .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery, all .swap files + * whose offset is greater than the minimum-offset .clean file are deleted. + *
  8. If the broker crashes after all new segments were renamed to .swap, the operation is completed, the swap + * operation is resumed on recovery as described in the next step. + *
  9. Old segment files are renamed to .deleted and asynchronous delete is scheduled. + * If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). + * replaceSegments() is then invoked to complete the swap with newSegment recreated from + * the .swap file and oldSegments containing segments which were not renamed before the crash. + *
  10. Swap segment(s) are renamed to replace the existing segments, completing this operation. + * If the broker crashes, any .deleted files which may be left behind are deleted + * on recovery in loadSegments(). *
* - * @param newSegments The new log segment to add to the log - * @param oldSegments The old log segments to delete from the log + * @param newSegments The new log segment to add to the log + * @param oldSegments The old log segments to delete from the log * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash */ private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) { @@ -1799,7 +1816,7 @@ class Log(@volatile var dir: File, // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() if (!isRecoveredSwapFile) - sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)) + sortedNewSegments.reverse.foreach(_.changeSegmentStatus(SegmentStatus.CLEANED, SegmentStatus.SWAP)) sortedNewSegments.reverse.foreach(addSegment(_)) // delete the old files @@ -1807,11 +1824,12 @@ class Log(@volatile var dir: File, // remove the index entry if (seg.baseOffset != sortedNewSegments.head.baseOffset) segments.remove(seg.baseOffset) + seg.closeHandlers() // delete segment - asyncDeleteSegment(seg) + asyncDeleteSegment(seg.segDir) } // okay we are safe now, remove the swap suffix - sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + sortedNewSegments.foreach(_.changeSegmentStatus(SegmentStatus.SWAP, SegmentStatus.HOT)) } } @@ -1827,6 +1845,7 @@ class Log(@volatile var dir: File, /** * Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it. + * * @param segment The segment to add */ @threadsafe @@ -1865,11 +1884,12 @@ class Log(@volatile var dir: File, *

The split logic overloads the use of .clean files that LogCleaner typically uses to make the process of replacing * the input segment with multiple new segments atomic and recoverable in the event of a crash. See replaceSegments * and completeSwapOperations for the implementation to make this operation recoverable on crashes.

+ * * @param segment Segment to split * @return List of new segments that replace the input segment */ private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = { - require(isLogFile(segment.log.file), s"Cannot split file ${segment.log.file.getAbsoluteFile}") + require(segment.getSegmentStatus() == SegmentStatus.HOT, s"Cannot split file ${segment.log.file.getAbsoluteFile}") require(segment.hasOverflow, "Split operation is only permitted for segments with overflow") info(s"Splitting overflowed segment $segment") @@ -1975,9 +1995,13 @@ object Log { time: Time = Time.SYSTEM, maxProducerIdExpirationMs: Int, producerIdExpirationCheckIntervalMs: Int, - logDirFailureChannel: LogDirFailureChannel): Log = { + logDirFailureChannel: LogDirFailureChannel, + isFuture: Boolean): Log = { val topicPartition = Log.parseTopicPartitionName(dir) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) + if (isFuture) { + SegmentStatusHandler.setStatus(new File(dir, SegmentFile.STATUS.getName), SegmentStatus.FUTURE) + } new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel) } @@ -2000,12 +2024,12 @@ object Log { /** * Construct a log file name in the given dir with the given base offset and the given suffix * - * @param dir The directory in which the log will reside + * @param dir The directory in which the log will reside * @param offset The base offset of the log file * @param suffix The suffix to be appended to the file name (e.g. "", ".deleted", ".cleaned", ".swap", etc.) */ def logFile(dir: File, offset: Long, suffix: String = ""): File = - new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix + suffix) + new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix) /** * Return a directory name to rename the log directory to for async deletion. The name will be in the following @@ -2039,22 +2063,22 @@ object Log { /** * Construct an index file name in the given dir using the given base offset and the given suffix * - * @param dir The directory in which the log will reside + * @param dir The directory in which the log will reside * @param offset The base offset of the log file * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) */ def offsetIndexFile(dir: File, offset: Long, suffix: String = ""): File = - new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix + suffix) + new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) /** * Construct a time index file name in the given dir using the given base offset and the given suffix * - * @param dir The directory in which the log will reside + * @param dir The directory in which the log will reside * @param offset The base offset of the log file * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) */ def timeIndexFile(dir: File, offset: Long, suffix: String = ""): File = - new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix + suffix) + new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix) def deleteFileIfExists(file: File, suffix: String = ""): Unit = Files.deleteIfExists(new File(file.getPath + suffix).toPath) @@ -2062,7 +2086,7 @@ object Log { /** * Construct a producer id snapshot file using the given offset. * - * @param dir The directory in which the log will reside + * @param dir The directory in which the log will reside * @param offset The last offset (exclusive) included in the snapshot */ def producerSnapshotFile(dir: File, offset: Long): File = @@ -2071,12 +2095,12 @@ object Log { /** * Construct a transaction index file name in the given dir using the given base offset and the given suffix * - * @param dir The directory in which the log will reside + * @param dir The directory in which the log will reside * @param offset The base offset of the log file * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) */ def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File = - new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix + suffix) + new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix) def offsetFromFileName(filename: String): Long = { filename.substring(0, filename.indexOf('.')).toLong @@ -2112,7 +2136,7 @@ object Log { if (dirName == null || dirName.isEmpty || !dirName.contains('-')) throw exception(dir) if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches || - dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches) + dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches) throw exception(dir) val name: String = @@ -2127,17 +2151,10 @@ object Log { val partition = try partitionString.toInt - catch { case _: NumberFormatException => throw exception(dir) } + catch { + case _: NumberFormatException => throw exception(dir) + } new TopicPartition(topic, partition) } - - private def isIndexFile(file: File): Boolean = { - val filename = file.getName - filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix) || filename.endsWith(TxnIndexFileSuffix) - } - - private def isLogFile(file: File): Boolean = - file.getPath.endsWith(LogFileSuffix) - } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 91ddbf0..3b7c538 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -72,28 +72,27 @@ import scala.collection.{Set, mutable} * are the key points: * * 1. In order to maintain sequence number continuity for active producers, we always retain the last batch - * from each producerId, even if all the records from the batch have been removed. The batch will be removed - * once the producer either writes a new batch or is expired due to inactivity. + * from each producerId, even if all the records from the batch have been removed. The batch will be removed + * once the producer either writes a new batch or is expired due to inactivity. * 2. We do not clean beyond the last stable offset. This ensures that all records observed by the cleaner have - * been decided (i.e. committed or aborted). In particular, this allows us to use the transaction index to - * collect the aborted transactions ahead of time. + * been decided (i.e. committed or aborted). In particular, this allows us to use the transaction index to + * collect the aborted transactions ahead of time. * 3. Records from aborted transactions are removed by the cleaner immediately without regard to record keys. * 4. Transaction markers are retained until all record batches from the same transaction have been removed and - * a sufficient amount of time has passed to reasonably ensure that an active consumer wouldn't consume any - * data from the transaction prior to reaching the offset of the marker. This follows the same logic used for - * tombstone deletion. + * a sufficient amount of time has passed to reasonably ensure that an active consumer wouldn't consume any + * data from the transaction prior to reaching the offset of the marker. This follows the same logic used for + * tombstone deletion. * * @param initialConfig Initial configuration parameters for the cleaner. Actual config may be dynamically updated. - * @param logDirs The directories where offset checkpoints reside - * @param logs The pool of logs - * @param time A way to control the passage of time + * @param logDirs The directories where offset checkpoints reside + * @param logs The pool of logs + * @param time A way to control the passage of time */ class LogCleaner(initialConfig: CleanerConfig, val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel, - time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable -{ + time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable { /* Log cleaner configuration which may be dynamically updated */ @volatile private var config = initialConfig @@ -103,34 +102,34 @@ class LogCleaner(initialConfig: CleanerConfig, /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, - checkIntervalMs = 300, - throttleDown = true, - "cleaner-io", - "bytes", - time = time) + checkIntervalMs = 300, + throttleDown = true, + "cleaner-io", + "bytes", + time = time) /* the threads */ private val cleaners = mutable.ArrayBuffer[CleanerThread]() /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ newGauge("max-buffer-utilization-percent", - new Gauge[Int] { - def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt - }) + new Gauge[Int] { + def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt + }) /* a metric to track the recopy rate of each thread's last cleaning */ newGauge("cleaner-recopy-percent", - new Gauge[Int] { - def value: Int = { - val stats = cleaners.map(_.lastStats) - val recopyRate = stats.map(_.bytesWritten).sum.toDouble / math.max(stats.map(_.bytesRead).sum, 1) - (100 * recopyRate).toInt - } - }) + new Gauge[Int] { + def value: Int = { + val stats = cleaners.map(_.lastStats) + val recopyRate = stats.map(_.bytesWritten).sum.toDouble / math.max(stats.map(_.bytesRead).sum, 1) + (100 * recopyRate).toInt + } + }) /* a metric to track the maximum cleaning time for the last cleaning from each thread */ newGauge("max-clean-time-secs", - new Gauge[Int] { - def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt - }) + new Gauge[Int] { + def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt + }) /** * Start the background cleaning @@ -171,9 +170,9 @@ class LogCleaner(initialConfig: CleanerConfig, } /** - * Reconfigure log clean config. This simply stops current log cleaners and creates new ones. - * That ensures that if any of the cleaners had failed, new cleaners are created to match the new config. - */ + * Reconfigure log clean config. This simply stops current log cleaners and creates new ones. + * That ensures that if any of the cleaners had failed, new cleaners are created to match the new config. + */ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { config = LogCleaner.cleanerConfig(newConfig) shutdown() @@ -181,8 +180,8 @@ class LogCleaner(initialConfig: CleanerConfig, } /** - * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of - * the partition is aborted. + * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of + * the partition is aborted. */ def abortCleaning(topicPartition: TopicPartition) { cleanerManager.abortCleaning(topicPartition) @@ -192,7 +191,7 @@ class LogCleaner(initialConfig: CleanerConfig, * Update checkpoint file, removing topics and partitions that no longer exist */ def updateCheckpoints(dataDir: File) { - cleanerManager.updateCheckpoints(dataDir, update=None) + cleanerManager.updateCheckpoints(dataDir, update = None) } def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = { @@ -211,15 +210,15 @@ class LogCleaner(initialConfig: CleanerConfig, } /** - * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. - * This call blocks until the cleaning of the partition is aborted and paused. + * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. + * This call blocks until the cleaning of the partition is aborted and paused. */ def abortAndPauseCleaning(topicPartition: TopicPartition) { cleanerManager.abortAndPauseCleaning(topicPartition) } /** - * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. + * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. */ def resumeCleaning(topicPartition: TopicPartition) { cleanerManager.resumeCleaning(topicPartition) @@ -230,13 +229,13 @@ class LogCleaner(initialConfig: CleanerConfig, * cleaner has processed up to the given offset on the specified topic/partition * * @param topicPartition The topic and partition to be cleaned - * @param offset The first dirty offset that the cleaner doesn't have to clean - * @param maxWaitMs The maximum time in ms to wait for cleaner - * + * @param offset The first dirty offset that the cleaner doesn't have to clean + * @param maxWaitMs The maximum time in ms to wait for cleaner * @return A boolean indicating whether the work has completed before timeout */ def awaitCleaned(topicPartition: TopicPartition, offset: Long, maxWaitMs: Long = 60000L): Boolean = { def isCleaned = cleanerManager.allCleanerCheckpoints.get(topicPartition).fold(false)(_ >= offset) + var remainingWaitMs = maxWaitMs while (!isCleaned && remainingWaitMs > 0) { val sleepTime = math.min(100, remainingWaitMs) @@ -265,14 +264,14 @@ class LogCleaner(initialConfig: CleanerConfig, warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") val cleaner = new Cleaner(id = threadId, - offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, - hashAlgorithm = config.hashAlgorithm), - ioBufferSize = config.ioBufferSize / config.numThreads / 2, - maxIoBufferSize = config.maxMessageSize, - dupBufferLoadFactor = config.dedupeBufferLoadFactor, - throttler = throttler, - time = time, - checkDone = checkDone) + offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, + hashAlgorithm = config.hashAlgorithm), + ioBufferSize = config.ioBufferSize / config.numThreads / 2, + maxIoBufferSize = config.maxMessageSize, + dupBufferLoadFactor = config.dedupeBufferLoadFactor, + throttler = throttler, + time = time, + checkDone = checkDone) @volatile var lastStats: CleanerStats = new CleanerStats() @@ -315,7 +314,7 @@ class LogCleaner(initialConfig: CleanerConfig, true } val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() - deletable.foreach{ + deletable.foreach { case (topicPartition, log) => try { log.deleteOldSegments() @@ -332,24 +331,26 @@ class LogCleaner(initialConfig: CleanerConfig, */ def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { this.lastStats = stats - def mb(bytes: Double) = bytes / (1024*1024) + + def mb(bytes: Double) = bytes / (1024 * 1024) + val message = "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + - "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), - stats.elapsedSecs, - mb(stats.bytesRead/stats.elapsedSecs)) + - "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), - stats.elapsedIndexSecs, - mb(stats.mapBytesRead)/stats.elapsedIndexSecs, - 100 * stats.elapsedIndexSecs/stats.elapsedSecs) + - "\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) + - "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), - stats.elapsedSecs - stats.elapsedIndexSecs, - mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) + - "\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) + - "\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) + - "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead), - 100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead)) + "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), + stats.elapsedSecs, + mb(stats.bytesRead / stats.elapsedSecs)) + + "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), + stats.elapsedIndexSecs, + mb(stats.mapBytesRead) / stats.elapsedIndexSecs, + 100 * stats.elapsedIndexSecs / stats.elapsedSecs) + + "\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) + + "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), + stats.elapsedSecs - stats.elapsedIndexSecs, + mb(stats.bytesRead) / (stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble / stats.elapsedSecs) + + "\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) + + "\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) + + "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble / stats.bytesRead), + 100.0 * (1.0 - stats.messagesWritten.toDouble / stats.messagesRead)) info(message) if (stats.invalidMessagesRead > 0) { warn("\tFound %d invalid messages during compaction.".format(stats.invalidMessagesRead)) @@ -357,6 +358,7 @@ class LogCleaner(initialConfig: CleanerConfig, } } + } object LogCleaner { @@ -383,22 +385,28 @@ object LogCleaner { } def createNewCleanedSegment(log: Log, baseOffset: Long): LogSegment = { - LogSegment.deleteIfExists(log.dir, baseOffset, fileSuffix = Log.CleanedFileSuffix) - LogSegment.open(log.dir, baseOffset, log.config, Time.SYSTEM, fileAlreadyExists = false, - fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate) + val segDir = LogSegment.getSegmentDir(log.dir, baseOffset, randomDigits = true) + val tobeRemoved = LogSegment.getCleanedSegmentFiles(log.dir, baseOffset) + if (tobeRemoved != null) { + tobeRemoved.foreach(segDir => log.asyncDeleteSegment(segDir)) + } + val segment = LogSegment.open(segDir, baseOffset, log.config, Time.SYSTEM, fileAlreadyExists = false, + initFileSize = log.initFileSize, preallocate = log.config.preallocate, segmentStatus = SegmentStatus.CLEANED) + segment } } /** * This class holds the actual logic for cleaning a log - * @param id An identifier used for logging - * @param offsetMap The map used for deduplication - * @param ioBufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer. - * @param maxIoBufferSize The maximum size of a message that can appear in the log + * + * @param id An identifier used for logging + * @param offsetMap The map used for deduplication + * @param ioBufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer. + * @param maxIoBufferSize The maximum size of a message that can appear in the log * @param dupBufferLoadFactor The maximum percent full for the deduplication buffer - * @param throttler The throttler instance to use for limiting I/O rate. - * @param time The time instance - * @param checkDone Check if the cleaning for a partition is finished or aborted. + * @param throttler The throttler instance to use for limiting I/O rate. + * @param time The time instance + * @param checkDone Check if the cleaning for a partition is finished or aborted. */ private[log] class Cleaner(val id: Int, val offsetMap: OffsetMap, @@ -427,16 +435,15 @@ private[log] class Cleaner(val id: Int, * Clean the given log * * @param cleanable The log to be cleaned - * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = - cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { - case None => 0L - case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs + cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { + case None => 0L + case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs } doClean(cleanable, deleteHorizonMs) @@ -475,11 +482,11 @@ private[log] class Cleaner(val id: Int, /** * Clean a group of segments into a single replacement segment * - * @param log The log being cleaned - * @param segments The group of segments being cleaned - * @param map The offset map to use for cleaning segments + * @param log The log being cleaned + * @param segments The group of segments being cleaned + * @param map The offset map to use for cleaning segments * @param deleteHorizonMs The time to retain delete tombstones - * @param stats Collector for cleaning statistics + * @param stats Collector for cleaning statistics */ private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], @@ -504,7 +511,7 @@ private[log] class Cleaner(val id: Int, val retainDeletes = currentSegment.lastModified > deleteHorizonMs info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " + - s"into ${cleaned.baseOffset}, ${if(retainDeletes) "retaining" else "discarding"} deletes.") + s"into ${cleaned.baseOffset}, ${if (retainDeletes) "retaining" else "discarding"} deletes.") try { cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletes, log.config.maxMessageSize, @@ -545,13 +552,13 @@ private[log] class Cleaner(val id: Int, * Clean the given source log segment into the destination segment using the key=>offset mapping * provided * - * @param topicPartition The topic and partition of the log segment to clean - * @param sourceRecords The dirty log segment - * @param dest The cleaned log segment - * @param map The key=>offset mapping - * @param retainDeletes Should delete tombstones be retained while cleaning this segment + * @param topicPartition The topic and partition of the log segment to clean + * @param sourceRecords The dirty log segment + * @param dest The cleaned log segment + * @param map The key=>offset mapping + * @param retainDeletes Should delete tombstones be retained while cleaning this segment * @param maxLogMessageSize The maximum message size of the corresponding topic - * @param stats Collector for cleaning statistics + * @param stats Collector for cleaning statistics */ private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, @@ -582,10 +589,10 @@ private[log] class Cleaner(val id: Int, override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { if (discardBatchRecords) - // The batch is only retained to preserve producer sequence information; the records can be removed - false + // The batch is only retained to preserve producer sequence information; the records can be removed + false else - Cleaner.this.shouldRetainRecord(map, retainDeletes, batch, record, stats) + Cleaner.this.shouldRetainRecord(map, retainDeletes, batch, record, stats) } } @@ -702,7 +709,7 @@ private[log] class Cleaner(val id: Int, */ def growBuffers(maxLogMessageSize: Int) { val maxBufferSize = math.max(maxLogMessageSize, maxIoBufferSize) - if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize) + if (readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize) throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxBufferSize)) val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize) info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize + " bytes.") @@ -714,9 +721,9 @@ private[log] class Cleaner(val id: Int, * Restore the I/O buffer capacity to its original size */ def restoreBuffers() { - if(this.readBuffer.capacity > this.ioBufferSize) + if (this.readBuffer.capacity > this.ioBufferSize) this.readBuffer = ByteBuffer.allocate(this.ioBufferSize) - if(this.writeBuffer.capacity > this.ioBufferSize) + if (this.writeBuffer.capacity > this.ioBufferSize) this.writeBuffer = ByteBuffer.allocate(this.ioBufferSize) } @@ -725,26 +732,25 @@ private[log] class Cleaner(val id: Int, * We collect a group of such segments together into a single * destination segment. This prevents segment sizes from shrinking too much. * - * @param segments The log segments to group - * @param maxSize the maximum size in bytes for the total of all log data in a group + * @param segments The log segments to group + * @param maxSize the maximum size in bytes for the total of all log data in a group * @param maxIndexSize the maximum size in bytes for the total of all index data in a group - * * @return A list of grouped segments */ private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int, firstUncleanableOffset: Long): List[Seq[LogSegment]] = { var grouped = List[List[LogSegment]]() var segs = segments.toList - while(segs.nonEmpty) { + while (segs.nonEmpty) { var group = List(segs.head) var logSize = segs.head.size.toLong var indexSize = segs.head.offsetIndex.sizeInBytes.toLong var timeIndexSize = segs.head.timeIndex.sizeInBytes.toLong segs = segs.tail - while(segs.nonEmpty && - logSize + segs.head.size <= maxSize && - indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize && - timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && - lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) { + while (segs.nonEmpty && + logSize + segs.head.size <= maxSize && + indexSize + segs.head.offsetIndex.sizeInBytes <= maxIndexSize && + timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize && + lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue) { group = segs.head :: group logSize += segs.head.size indexSize += segs.head.offsetIndex.sizeInBytes @@ -757,16 +763,16 @@ private[log] class Cleaner(val id: Int, } /** - * We want to get the last offset in the first log segment in segs. - * LogSegment.nextOffset() gives the exact last offset in a segment, but can be expensive since it requires - * scanning the segment from the last index entry. - * Therefore, we estimate the last offset of the first log segment by using - * the base offset of the next segment in the list. - * If the next segment doesn't exist, first Uncleanable Offset will be used. - * - * @param segs - remaining segments to group. - * @return The estimated last offset for the first segment in segs - */ + * We want to get the last offset in the first log segment in segs. + * LogSegment.nextOffset() gives the exact last offset in a segment, but can be expensive since it requires + * scanning the segment from the last index entry. + * Therefore, we estimate the last offset of the first log segment by using + * the base offset of the next segment in the list. + * If the next segment doesn't exist, first Uncleanable Offset will be used. + * + * @param segs - remaining segments to group. + * @return The estimated last offset for the first segment in segs + */ private def lastOffsetForFirstSegment(segs: List[LogSegment], firstUncleanableOffset: Long): Long = { if (segs.size > 1) { /* if there is a next segment, use its base offset as the bounding offset to guarantee we know @@ -780,10 +786,11 @@ private[log] class Cleaner(val id: Int, /** * Build a map of key_hash => offset for the keys in the cleanable dirty portion of the log to use in cleaning. - * @param log The log to use + * + * @param log The log to use * @param start The offset at which dirty messages begin - * @param end The ending offset for the map that is being built - * @param map The map in which to store the mappings + * @param end The ending offset for the map that is being built + * @param map The map in which to store the mappings * @param stats Collector for cleaning statistics */ private[log] def buildOffsetMap(log: Log, @@ -816,9 +823,8 @@ private[log] class Cleaner(val id: Int, * Add the messages in the given segment to the offset map * * @param segment The segment to index - * @param map The map in which to store the key=>offset mapping - * @param stats Collector for cleaning statistics - * + * @param map The map in which to store the key=>offset mapping + * @param stats Collector for cleaning statistics * @return If the map was filled whilst loading from this segment */ private def buildOffsetMapForSegment(topicPartition: TopicPartition, @@ -875,7 +881,7 @@ private[log] class Cleaner(val id: Int, stats.indexBytesRead(bytesRead) // if we didn't read even one complete message, our read buffer may be too small - if(position == startPosition) + if (position == startPosition) growBuffersOrFail(segment.log, position, maxLogMessageSize, records) } restoreBuffers() @@ -929,9 +935,9 @@ private class CleanerStats(time: Time = Time.SYSTEM) { endTime = time.milliseconds } - def elapsedSecs = (endTime - startTime)/1000.0 + def elapsedSecs = (endTime - startTime) / 1000.0 - def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0 + def elapsedIndexSecs = (mapCompleteTime - startTime) / 1000.0 } @@ -945,6 +951,7 @@ private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDir val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum val totalBytes = cleanBytes + cleanableBytes val cleanableRatio = cleanableBytes / totalBytes.toDouble + override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 32203ac..b2fb02c 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -76,6 +76,7 @@ class LogManager(logDirs: Seq[File], private val futureLogs = new Pool[TopicPartition, Log]() // Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion. private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]() + private val tpToBeDeleted = new CopyOnWriteArraySet[TopicPartition]() private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) @volatile private var _currentDefaultConfig = initialDefaultConfig @@ -112,7 +113,7 @@ class LogManager(logDirs: Seq[File], // public, so we can access this from kafka.admin.DeleteTopicTest val cleaner: LogCleaner = - if(cleanerConfig.enableCleaner) + if (cleanerConfig.enableCleaner) new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time) else null @@ -210,7 +211,8 @@ class LogManager(logDirs: Seq[File], removedLog.closeHandlers() removedLog.removeLogMetrics() } - }} + } + } val offlineFutureTopicPartitions = futureLogs.collect { case (tp, log) if log.dir.getParent == dir => tp @@ -221,10 +223,11 @@ class LogManager(logDirs: Seq[File], removedLog.closeHandlers() removedLog.removeLogMetrics() } - }} + } + } info(s"Logs for partitions ${offlineCurrentTopicPartitions.mkString(",")} are offline and " + - s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir") + s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir") dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy(), this)) } } @@ -250,6 +253,7 @@ class LogManager(logDirs: Seq[File], private def addLogToBeDeleted(log: Log): Unit = { this.logsToBeDeleted.add((log, time.milliseconds())) + this.tpToBeDeleted.add(log.topicPartition) } // Only for testing @@ -272,9 +276,10 @@ class LogManager(logDirs: Seq[File], scheduler = scheduler, time = time, brokerTopicStats = brokerTopicStats, - logDirFailureChannel = logDirFailureChannel) + logDirFailureChannel = logDirFailureChannel + , isFuture = false) - if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { + if (LogSegment.getStatus(logDir) == SegmentStatus.DELETED) { addLogToBeDeleted(log) } else { val previous = { @@ -385,37 +390,37 @@ class LogManager(logDirs: Seq[File], } /** - * Start the background threads to flush logs and do log cleanup + * Start the background threads to flush logs and do log cleanup */ def startup() { /* Schedule the cleanup task to delete old logs */ if (scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) scheduler.schedule("kafka-log-retention", - cleanupLogs _, - delay = InitialTaskDelayMs, - period = retentionCheckMs, - TimeUnit.MILLISECONDS) + cleanupLogs _, + delay = InitialTaskDelayMs, + period = retentionCheckMs, + TimeUnit.MILLISECONDS) info("Starting log flusher with a default period of %d ms.".format(flushCheckMs)) scheduler.schedule("kafka-log-flusher", - flushDirtyLogs _, - delay = InitialTaskDelayMs, - period = flushCheckMs, - TimeUnit.MILLISECONDS) + flushDirtyLogs _, + delay = InitialTaskDelayMs, + period = flushCheckMs, + TimeUnit.MILLISECONDS) scheduler.schedule("kafka-recovery-point-checkpoint", - checkpointLogRecoveryOffsets _, - delay = InitialTaskDelayMs, - period = flushRecoveryOffsetCheckpointMs, - TimeUnit.MILLISECONDS) + checkpointLogRecoveryOffsets _, + delay = InitialTaskDelayMs, + period = flushRecoveryOffsetCheckpointMs, + TimeUnit.MILLISECONDS) scheduler.schedule("kafka-log-start-offset-checkpoint", - checkpointLogStartOffsets _, - delay = InitialTaskDelayMs, - period = flushStartOffsetCheckpointMs, - TimeUnit.MILLISECONDS) + checkpointLogStartOffsets _, + delay = InitialTaskDelayMs, + period = flushStartOffsetCheckpointMs, + TimeUnit.MILLISECONDS) scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period - deleteLogs _, - delay = InitialTaskDelayMs, - unit = TimeUnit.MILLISECONDS) + deleteLogs _, + delay = InitialTaskDelayMs, + unit = TimeUnit.MILLISECONDS) } if (cleanerConfig.enableCleaner) cleaner.startup() @@ -492,7 +497,7 @@ class LogManager(logDirs: Seq[File], * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset * * @param partitionOffsets Partition logs that need to be truncated - * @param isFuture True iff the truncation should be performed on the future log of the specified partitions + * @param isFuture True iff the truncation should be performed on the future log of the specified partitions */ def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean) { var truncated = false @@ -529,8 +534,8 @@ class LogManager(logDirs: Seq[File], * Delete all data in a partition and start the log at the new offset * * @param topicPartition The partition whose log needs to be truncated - * @param newOffset The new offset to start the log with - * @param isFuture True iff the truncation should be performed on the future log of the specified partition + * @param newOffset The new offset to start the log with + * @param isFuture True iff the truncation should be performed on the future log of the specified partition */ def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long, isFuture: Boolean) { val log = { @@ -541,7 +546,7 @@ class LogManager(logDirs: Seq[File], } // If the log does not exist, skip it if (log != null) { - //Abort and pause the cleaning of the log, and resume after truncation is done. + //Abort and pause the cleaning of the log, and resume after truncation is done. if (cleaner != null && !isFuture) cleaner.abortAndPauseCleaning(topicPartition) log.truncateFullyAndStartAt(newOffset) @@ -612,7 +617,7 @@ class LogManager(logDirs: Seq[File], def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = { // Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir if (!getLog(topicPartition).exists(_.dir.getParent == logDir) && - !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir)) + !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir)) preferredLogDirs.put(topicPartition, logDir) } @@ -626,7 +631,7 @@ class LogManager(logDirs: Seq[File], * Get the log if it exists, otherwise return None * * @param topicPartition the partition of the log - * @param isFuture True iff the future log of the specified partition should be returned + * @param isFuture True iff the future log of the specified partition should be returned */ def getLog(topicPartition: TopicPartition, isFuture: Boolean = false): Option[Log] = { if (isFuture) @@ -641,14 +646,20 @@ class LogManager(logDirs: Seq[File], * Otherwise throw KafkaStorageException * * @param topicPartition The partition whose log needs to be returned or created - * @param config The configuration of the log that should be applied for log creation - * @param isNew Whether the replica should have existed on the broker or not - * @param isFuture True iff the future log of the specified partition should be returned or created + * @param config The configuration of the log that should be applied for log creation + * @param isNew Whether the replica should have existed on the broker or not + * @param isFuture True iff the future log of the specified partition should be returned or created * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker */ def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = { logCreationOrDeletionLock synchronized { getLog(topicPartition, isFuture).getOrElse { + + while (tpToBeDeleted.contains(topicPartition)) { + Thread.sleep(500) + warn(s"Can not create log for $topicPartition because current topicPartition: $topicPartition is under deletion ") + } + // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") @@ -672,12 +683,9 @@ class LogManager(logDirs: Seq[File], throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline") try { - val dir = { - if (isFuture) - new File(logDir, Log.logFutureDirName(topicPartition)) - else - new File(logDir, Log.logDirName(topicPartition)) - } + + val dir = new File(logDir, Log.logDirName(topicPartition)) + Files.createDirectories(dir.toPath) val log = Log( @@ -690,7 +698,8 @@ class LogManager(logDirs: Seq[File], scheduler = scheduler, time = time, brokerTopicStats = brokerTopicStats, - logDirFailureChannel = logDirFailureChannel) + logDirFailureChannel = logDirFailureChannel, + isFuture = isFuture) if (isFuture) futureLogs.put(topicPartition, log) @@ -714,11 +723,11 @@ class LogManager(logDirs: Seq[File], } /** - * Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs` - * has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be - * considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed - * after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`, - * `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`. + * Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs` + * has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be + * considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed + * after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`, + * `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`. */ private def deleteLogs(): Unit = { var nextDelayMs = 0L @@ -731,11 +740,15 @@ class LogManager(logDirs: Seq[File], currentDefaultConfig.fileDeleteDelayMs } - while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) { + while ( { + nextDelayMs = nextDeleteDelayMs; + nextDelayMs <= 0 + }) { val (removedLog, _) = logsToBeDeleted.take() if (removedLog != null) { try { removedLog.delete() + tpToBeDeleted.remove(removedLog.topicPartition) info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") } catch { case e: KafkaStorageException => @@ -763,11 +776,11 @@ class LogManager(logDirs: Seq[File], } /** - * Mark the partition directory in the source log directory for deletion and - * rename the future log of this partition in the destination log directory to be the current log - * - * @param topicPartition TopicPartition that needs to be swapped - */ + * Mark the partition directory in the source log directory for deletion and + * rename the future log of this partition in the destination log directory to be the current log + * + * @param topicPartition TopicPartition that needs to be swapped + */ def replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit = { logCreationOrDeletionLock synchronized { val sourceLog = currentLogs.get(topicPartition) @@ -778,7 +791,7 @@ class LogManager(logDirs: Seq[File], if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(Log.logDirName(topicPartition)) + LogSegment.setStatus(destLog.dir, SegmentStatus.HOT) // Now that future replica has been successfully renamed to be the current replica // Update the cached map and log cleaner as appropriate. futureLogs.remove(topicPartition) @@ -789,9 +802,9 @@ class LogManager(logDirs: Seq[File], } try { - sourceLog.renameDir(Log.logDeleteDirName(topicPartition)) + LogSegment.setStatus(sourceLog.dir, SegmentStatus.DELETED) // Now that replica in source log directory has been successfully renamed for deletion. - // Close the log, update checkpoint files, and enqueue this log to be deleted. + // Close the log, update checkpoint files, and enqueue this log to be deleted sourceLog.close() checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile) checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) @@ -810,13 +823,13 @@ class LogManager(logDirs: Seq[File], } /** - * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and - * add it in the queue for deletion. - * - * @param topicPartition TopicPartition that needs to be deleted - * @param isFuture True iff the future log of the specified partition should be deleted - * @return the removed log - */ + * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and + * add it in the queue for deletion. + * + * @param topicPartition TopicPartition that needs to be deleted + * @param isFuture True iff the future log of the specified partition should be deleted + * @return the removed log + */ def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = { val removedLog: Log = logCreationOrDeletionLock synchronized { if (isFuture) @@ -830,7 +843,8 @@ class LogManager(logDirs: Seq[File], cleaner.abortCleaning(topicPartition) cleaner.updateCheckpoints(removedLog.dir.getParentFile) } - removedLog.renameDir(Log.logDeleteDirName(topicPartition)) + LogSegment.setStatus(removedLog.dir, SegmentStatus.DELETED) + removedLog.close() checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile) checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) addLogToBeDeleted(removedLog) @@ -847,7 +861,7 @@ class LogManager(logDirs: Seq[File], * data directory with the fewest partitions. */ private def nextLogDir(): File = { - if(_liveLogDirs.size == 1) { + if (_liveLogDirs.size == 1) { _liveLogDirs.peek() } else { // count the number of logs in each parent directory (including 0 for empty directories @@ -869,12 +883,12 @@ class LogManager(logDirs: Seq[File], debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds - for(log <- allLogs; if !log.config.compact) { + for (log <- allLogs; if !log.config.compact) { debug("Garbage collecting '" + log.name + "'") total += log.deleteOldSegments() } debug("Log cleanup completed. " + total + " files deleted in " + - (time.milliseconds - startMs) / 1000 + " seconds") + (time.milliseconds - startMs) / 1000 + " seconds") } /** @@ -916,8 +930,8 @@ class LogManager(logDirs: Seq[File], try { val timeSinceLastFlush = time.milliseconds - log.lastFlushTime debug("Checking if flush is needed on " + topicPartition.topic + " flush interval " + log.config.flushMs + - " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) - if(timeSinceLastFlush >= log.config.flushMs) + " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) + if (timeSinceLastFlush >= log.config.flushMs) log.flush } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 58600bc..d317f1f 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,11 +17,11 @@ package kafka.log import java.io.{File, IOException} -import java.nio.file.{Files, NoSuchFileException} +import java.nio.file.{Files, NoSuchFileException, Path} import java.nio.file.attribute.FileTime import java.util.concurrent.TimeUnit -import kafka.common.LogSegmentOffsetOverflowException +import kafka.common.{KafkaException, LogSegmentOffsetOverflowException} import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochCache import kafka.server.{FetchDataInfo, LogOffsetMetadata} @@ -42,24 +42,26 @@ import scala.math._ * * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. * - * @param log The file records containing log entries - * @param offsetIndex The offset index - * @param timeIndex The timestamp index - * @param baseOffset A lower bound on the offsets in this segment + * @param log The file records containing log entries + * @param offsetIndex The offset index + * @param timeIndex The timestamp index + * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index - * @param time The time instance + * @param time The time instance */ @nonthreadsafe -class LogSegment private[log] (val log: FileRecords, - val offsetIndex: OffsetIndex, - val timeIndex: TimeIndex, - val txnIndex: TransactionIndex, - val baseOffset: Long, - val indexIntervalBytes: Int, - val rollJitterMs: Long, - val maxSegmentMs: Long, - val maxSegmentBytes: Int, - val time: Time) extends Logging { +class LogSegment private[log](val log: FileRecords, + val offsetIndex: OffsetIndex, + val timeIndex: TimeIndex, + val txnIndex: TransactionIndex, + val baseOffset: Long, + val indexIntervalBytes: Int, + val rollJitterMs: Long, + val maxSegmentMs: Long, + val maxSegmentBytes: Int, + val time: Time, dir: File) extends Logging { + + def segDir: File = dir def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = { val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs @@ -68,6 +70,37 @@ class LogSegment private[log] (val log: FileRecords, offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages) } + /** + * Get the status for the for this log segment + * IOException from this method should be handled by the caller + */ + def getSegmentStatus(): SegmentStatus = { + SegmentStatusHandler.getStatus(new File(segDir, SegmentFile.STATUS.getName)) + } + + + /** + * Change the status for the for this log segment + * IOException from this method should be handled by the caller + */ + def changeSegmentStatus(segmentStatus: SegmentStatus): Unit = { + changeSegmentStatus(SegmentStatus.HOT, segmentStatus) + } + + /** + * Change the status for the for this log segment + * IOException from this method should be handled by the caller + */ + def changeSegmentStatus(oldStatus: SegmentStatus, newStatus: SegmentStatus): Unit = { + val file = new File(segDir, SegmentFile.STATUS.getName) + val segStatus = SegmentStatusHandler.getStatus(file) + if (segStatus == oldStatus) { + SegmentStatusHandler.setStatus(file, newStatus) + } else { + throw new KafkaException(s"Invalid Segment Status $segStatus, expected $oldStatus") + } + } + def resizeIndexes(size: Int): Unit = { offsetIndex.resize(size) timeIndex.resize(size) @@ -113,10 +146,10 @@ class LogSegment private[log] (val log: FileRecords, * * It is assumed this method is being called from within a lock. * - * @param largestOffset The last offset in the message set - * @param largestTimestamp The largest timestamp in the message set. + * @param largestOffset The last offset in the message set + * @param largestTimestamp The largest timestamp in the message set. * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. - * @param records The log entries to append. + * @param records The log entries to append. * @return the physical position in the file of the appended records * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */ @@ -127,7 +160,7 @@ class LogSegment private[log] (val log: FileRecords, records: MemoryRecords): Unit = { if (records.sizeInBytes > 0) { trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " + - s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp") + s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp") val physicalPosition = log.sizeInBytes() if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp) @@ -242,11 +275,11 @@ class LogSegment private[log] (val log: FileRecords, * The startingFilePosition argument is an optimization that can be used if we already know a valid starting position * in the file higher than the greatest-lower-bound from the index. * - * @param offset The offset we want to translate + * @param offset The offset we want to translate * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and - * when omitted, the search will begin at the position in the offset index. + * when omitted, the search will begin at the position in the offset index. * @return The position in the log storing the message with the least offset >= the requested offset and the size of the - * message or null if no message meets this criteria. + * message or null if no message meets this criteria. */ @threadsafe private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = { @@ -258,12 +291,11 @@ class LogSegment private[log] (val log: FileRecords, * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. * - * @param startOffset A lower bound on the first offset to include in the message set we read - * @param maxOffset An optional maximum offset for the message set we read - * @param maxSize The maximum number of bytes to include in the message set we read - * @param maxPosition The maximum position in the log segment that should be exposed for read + * @param startOffset A lower bound on the first offset to include in the message set we read + * @param maxOffset An optional maximum offset for the message set we read + * @param maxSize The maximum number of bytes to include in the message set we read + * @param maxPosition The maximum position in the log segment that should be exposed for read * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) - * * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, * or null if the startOffset is larger than the largest offset in this log */ @@ -308,7 +340,7 @@ class LogSegment private[log] (val log: FileRecords, if (mapping == null) logSize // the max offset is off the end of the log, use the end of the file else - mapping.position + mapping.position min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt } @@ -316,8 +348,8 @@ class LogSegment private[log] (val log: FileRecords, firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) } - def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Option[Long] = - offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset) + def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Option[Long] = + offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset) /** * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes @@ -325,7 +357,7 @@ class LogSegment private[log] (val log: FileRecords, * * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover * the transaction index. - * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. + * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. * @return The number of bytes truncated from the log * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow */ @@ -359,7 +391,7 @@ class LogSegment private[log] (val log: FileRecords, if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { leaderEpochCache.foreach { cache => if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign() - cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) + cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } updateProducerState(producerStateManager, batch) } @@ -512,7 +544,7 @@ class LogSegment private[log] (val log: FileRecords, * segment is rolled if the difference between the current wall clock time and the segment create time exceeds the * segment rolling time. */ - def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = { + def timeWaitedForRoll(now: Long, messageTimestamp: Long): Long = { // Load the timestamp of the first message into memory if (rollingBasedTimestamp.isEmpty) { val iter = log.batches.iterator() @@ -533,16 +565,16 @@ class LogSegment private[log] (val log: FileRecords, * - If all the messages in the segment have smaller offsets, return None * - If all the messages in the segment have smaller timestamps, return None * - If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp - * the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp. + * the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp. * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp - * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset. + * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset. * * This methods only returns None when 1) all messages' offset < startOffing or 2) the log is not empty but we did not * see any message when scanning the log from the indexed position. The latter could happen if the log is truncated * after we get the indexed position but before we scan the log from there. In this case we simply return None and the * caller will need to check on the truncated log and maybe retry or even do the search on another log segment. * - * @param timestamp The timestamp to search for. + * @param timestamp The timestamp to search for. * @param startingOffset The starting offset to search. * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message. */ @@ -569,8 +601,8 @@ class LogSegment private[log] (val log: FileRecords, } /** - * Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed - */ + * Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed + */ def closeHandlers() { CoreUtils.swallow(offsetIndex.closeHandler(), this) CoreUtils.swallow(timeIndex.closeHandler(), this) @@ -626,28 +658,117 @@ class LogSegment private[log] (val log: FileRecords, object LogSegment { - def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, - initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = { + private val random = scala.util.Random + + def open(segDir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, + initFileSize: Int = 0, preallocate: Boolean = false, segmentStatus: SegmentStatus = SegmentStatus.HOT): LogSegment = { val maxIndexSize = config.maxIndexSize + if (!fileAlreadyExists) { + segDir.mkdirs() + SegmentStatusHandler.setStatus(new File(segDir, SegmentFile.STATUS.getName), segmentStatus) + } new LogSegment( - FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), - new OffsetIndex(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), - new TimeIndex(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), - new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)), + FileRecords.open(Log.logFile(segDir, baseOffset, SegmentFile.LOG.getName), fileAlreadyExists, initFileSize, preallocate), + new OffsetIndex(Log.offsetIndexFile(segDir, baseOffset, SegmentFile.OFFSET_INDEX.getName), baseOffset = baseOffset, maxIndexSize = maxIndexSize), + new TimeIndex(Log.timeIndexFile(segDir, baseOffset, SegmentFile.TIME_INDEX.getName), baseOffset = baseOffset, maxIndexSize = maxIndexSize), + new TransactionIndex(baseOffset, Log.transactionIndexFile(segDir, baseOffset, SegmentFile.TXN_INDEX.getName)), baseOffset, indexIntervalBytes = config.indexInterval, rollJitterMs = config.randomSegmentJitter, maxSegmentMs = config.segmentMs, maxSegmentBytes = config.segmentSize, - time) + time, segDir) } - def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = { - Log.deleteFileIfExists(Log.offsetIndexFile(dir, baseOffset, fileSuffix)) - Log.deleteFileIfExists(Log.timeIndexFile(dir, baseOffset, fileSuffix)) - Log.deleteFileIfExists(Log.transactionIndexFile(dir, baseOffset, fileSuffix)) - Log.deleteFileIfExists(Log.logFile(dir, baseOffset, fileSuffix)) + def isSegmentDir(file: File): Boolean = { + file.isDirectory && !file.getName.equals("snapshot") + } + + def getCleanedSegmentFiles(logDir: File, offset: Long): Array[File] = { + val fNameDir = String.valueOf(offset) + val fNameDyn = fNameDir + "-" + val files = logDir.listFiles() + files.filter(file => (file.getName.startsWith(fNameDyn) || file.getName.equals(fNameDir)) && getStatus(file) == SegmentStatus.CLEANED) + } + + def getSegmentDir(logDir: File, baseOffset: Long, randomDigits: Boolean = false): File = { + new File(logDir, if (randomDigits) baseOffset + "-" + random.nextInt(100000) else String.valueOf(baseOffset)) + } + + def getSegmentOffset(segDir: File): Long = { + val name = segDir.getName + val index = name.indexOf("-") + if (index > -1) { + name.substring(0, index).toLong + } else { + name.toLong + } } + + def deleteIfExists(segDir: File): Unit = { + if (segDir.exists()) { + val files = segDir.listFiles() + if (files != null) { + files.foreach(file => Files.deleteIfExists(file.toPath)) + } + Files.deleteIfExists(segDir.toPath) + } + } + + def deleteIndicesIfExist(segDir: File): Unit = { + Files.deleteIfExists(new File(segDir, SegmentFile.OFFSET_INDEX.getName).toPath) + Files.deleteIfExists(new File(segDir, SegmentFile.TIME_INDEX.getName).toPath) + Files.deleteIfExists(new File(segDir, SegmentFile.TXN_INDEX.getName).toPath) + } + + def getSnapshotDir(logDir: File): File = { + new File(logDir, "snapshot") + } + + def getSnapshotFile(logDir: File, baseOffset: Long): File = { + val segDir = new File(logDir, "snapshot") + new File(segDir, String.valueOf(baseOffset)) + } + + def getSnapshotOffset(snapshot: File): Long = { + snapshot.getName.toLong + } + + def getStatus(segDir: File): SegmentStatus = { + val statusFile = new File(segDir, SegmentFile.STATUS.getName) + if (statusFile.exists()) { + SegmentStatusHandler.getStatus(statusFile) + } else { + SegmentStatus.UNKNOWN; + } + } + + def setStatus(segDir: File, status: SegmentStatus): Unit = { + val statusFile = new File(segDir, SegmentFile.STATUS.getName) + SegmentStatusHandler.setStatus(statusFile, status) + } + + + def isSegmentFileExists(dir: File, baseOffset: Long, segmentFile: SegmentFile): Boolean = { + val segDir = new File(dir, String.valueOf(baseOffset)) + new File(segDir, "." + segmentFile.getName).exists + } + + def isSegmentFileExists(segDir: File, segmentFile: SegmentFile): Boolean = { + new File(segDir, segmentFile.getName).exists + } + + def canReadSegment(segDir: File): Boolean = { + val files = segDir.listFiles + var status = true + if (files != null) { + for (file <- files if file.isFile) { + status = status && file.canRead + } + } + status + } + } object LogFlushStats extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index caca9a8..e9aa776 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -38,24 +38,26 @@ class CorruptSnapshotException(msg: String) extends KafkaException(msg) // ValidationType and its subtypes define the extent of the validation to perform on a given ProducerAppendInfo instance private[log] sealed trait ValidationType + private[log] object ValidationType { /** - * This indicates no validation should be performed on the incoming append. This is the case for all appends on - * a replica, as well as appends when the producer state is being built from the log. - */ + * This indicates no validation should be performed on the incoming append. This is the case for all appends on + * a replica, as well as appends when the producer state is being built from the log. + */ case object None extends ValidationType /** - * We only validate the epoch (and not the sequence numbers) for offset commit requests coming from the transactional - * producer. These appends will not have sequence numbers, so we can't validate them. - */ + * We only validate the epoch (and not the sequence numbers) for offset commit requests coming from the transactional + * producer. These appends will not have sequence numbers, so we can't validate them. + */ case object EpochOnly extends ValidationType /** - * Perform the full validation. This should be used fo regular produce requests coming to the leader. - */ + * Perform the full validation. This should be used fo regular produce requests coming to the leader. + */ case object Full extends ValidationType + } private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) { @@ -71,11 +73,13 @@ private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffset private[log] object ProducerStateEntry { private[log] val NumBatchesToRetain = 5 + def empty(producerId: Long) = new ProducerStateEntry(producerId, mutable.Queue[BatchMetadata](), RecordBatch.NO_PRODUCER_EPOCH, -1, None) } private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) { def firstSeq = lastSeq - offsetDelta + def firstOffset = lastOffset - offsetDelta override def toString: String = { @@ -107,7 +111,7 @@ private[log] class ProducerStateEntry(val producerId: Long, def lastTimestamp = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp - def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta + def lastOffsetDelta: Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta def isEmpty: Boolean = batchMetadata.isEmpty @@ -144,7 +148,7 @@ private[log] class ProducerStateEntry(val producerId: Long, def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = { if (batch.producerEpoch != producerEpoch) - None + None else batchWithSequenceRange(batch.baseSequence, batch.lastSequence) } @@ -173,11 +177,11 @@ private[log] class ProducerStateEntry(val producerId: Long, * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata * as the incoming records are validated. * - * @param producerId The id of the producer appending to the log - * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of - * the most recent appends made by the producer. Validation of the first incoming append will - * be made against the latest append in the current entry. New appends will replace older appends - * in the current entry so that the space overhead is constant. + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. * @param validationType Indicates the extent of validation to perform on the appends on this instance. Offset commits * coming from the producer should have ValidationType.EpochOnly. Appends which aren't from a client * should have ValidationType.None. Appends coming from a client for produce requests should have @@ -378,7 +382,7 @@ object ProducerStateManager { throw new CorruptSnapshotException(s"Snapshot contained an unknown file version $version") val crc = struct.getUnsignedInt(CrcField) - val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) + val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) if (crc != computedCrc) throw new CorruptSnapshotException(s"Snapshot is corrupt (CRC is no longer valid). " + s"Stored crc: $crc. Computed crc: $computedCrc") @@ -438,13 +442,13 @@ object ProducerStateManager { } } - private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix) // visible for testing private[log] def listSnapshotFiles(dir: File): Seq[File] = { - if (dir.exists && dir.isDirectory) { - Option(dir.listFiles).map { files => - files.filter(f => f.isFile && isSnapshotFile(f)).toSeq + val snapshotDir = LogSegment.getSnapshotDir(dir) + if (snapshotDir.exists && snapshotDir.isDirectory) { + Option(snapshotDir.listFiles).map { files => + files.filter(f => f.isFile).toSeq }.getOrElse(Seq.empty) } else Seq.empty } @@ -453,7 +457,7 @@ object ProducerStateManager { private[log] def deleteSnapshotsBefore(dir: File, offset: Long): Unit = deleteSnapshotFiles(dir, _ < offset) private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => true) { - listSnapshotFiles(dir).filter(file => predicate(offsetFromFile(file))).foreach { file => + listSnapshotFiles(dir).filter(file => predicate(LogSegment.getSegmentOffset(file))).foreach { file => Files.deleteIfExists(file.toPath) } } @@ -480,6 +484,7 @@ object ProducerStateManager { class ProducerStateManager(val topicPartition: TopicPartition, @volatile var logDir: File, val maxProducerIdExpirationMs: Int = 60 * 60 * 1000) extends Logging { + import ProducerStateManager._ import java.util @@ -549,7 +554,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry) } loadedProducers.foreach(loadProducerEntry) - lastSnapOffset = offsetFromFile(file) + lastSnapOffset = offsetFromSnapshotFile(file) lastMapOffset = lastSnapOffset return } catch { @@ -661,7 +666,11 @@ class ProducerStateManager(val topicPartition: TopicPartition, def takeSnapshot(): Unit = { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { - val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset) + val snapshotFile = LogSegment.getSnapshotFile(logDir, lastMapOffset) + val parentDir = snapshotFile.getParentFile + if (!parentDir.exists()) { + parentDir.mkdirs() + } info(s"Writing producer snapshot at offset $lastMapOffset") writeSnapshot(snapshotFile, producers) @@ -673,12 +682,12 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * Get the last offset (exclusive) of the latest snapshot file. */ - def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFile(file)) + def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromSnapshotFile(file)) /** * Get the last offset (exclusive) of the oldest snapshot file. */ - def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) + def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromSnapshotFile(file)) private def isProducerRetained(producerStateEntry: ProducerStateEntry, logStartOffset: Long): Boolean = { producerStateEntry.removeBatchesOlderThan(logStartOffset) @@ -764,7 +773,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, private def oldestSnapshotFile: Option[File] = { val files = listSnapshotFiles if (files.nonEmpty) - Some(files.minBy(offsetFromFile)) + Some(files.minBy(offsetFromSnapshotFile)) else None } @@ -772,11 +781,15 @@ class ProducerStateManager(val topicPartition: TopicPartition, private def latestSnapshotFile: Option[File] = { val files = listSnapshotFiles if (files.nonEmpty) - Some(files.maxBy(offsetFromFile)) + Some(files.maxBy(offsetFromSnapshotFile)) else None } private def listSnapshotFiles: Seq[File] = ProducerStateManager.listSnapshotFiles(logDir) + def offsetFromSnapshotFile(file: File): Long = { + LogSegment.getSnapshotOffset(file) + } + } diff --git a/core/src/main/scala/kafka/log/SegmentFile.java b/core/src/main/scala/kafka/log/SegmentFile.java new file mode 100644 index 0000000..b58bc12 --- /dev/null +++ b/core/src/main/scala/kafka/log/SegmentFile.java @@ -0,0 +1,20 @@ +package kafka.log; + +import java.util.Objects; + +public enum SegmentFile { + + LOG("log"), OFFSET_INDEX("index"), TIME_INDEX("timeindex"), + TXN_INDEX("txnindex"), STATUS("status"); + + + private final String name; + + SegmentFile(String name) { + this.name = Objects.requireNonNull(name); + } + + public String getName() { + return name; + } +} diff --git a/core/src/main/scala/kafka/log/SegmentStatus.java b/core/src/main/scala/kafka/log/SegmentStatus.java new file mode 100644 index 0000000..6b135e6 --- /dev/null +++ b/core/src/main/scala/kafka/log/SegmentStatus.java @@ -0,0 +1,34 @@ +package kafka.log; + +public enum SegmentStatus { + HOT(1), DELETED(2), CLEANED(3), SWAP(4), UNKNOWN(5), FUTURE(6); + + private final int status; + + SegmentStatus(int status) { + this.status = status; + } + + public int getStatus() { + return status; + } + + public static SegmentStatus getStatus(int id) { + switch (id) { + case 1: + return HOT; + case 2: + return DELETED; + case 3: + return CLEANED; + case 4: + return SWAP; + case 5: + return UNKNOWN; + case 6: + return FUTURE; + default: + throw new RuntimeException("Invalid file status : " + id); + } + } +} diff --git a/core/src/main/scala/kafka/log/SegmentStatusHandler.java b/core/src/main/scala/kafka/log/SegmentStatusHandler.java new file mode 100644 index 0000000..32450e7 --- /dev/null +++ b/core/src/main/scala/kafka/log/SegmentStatusHandler.java @@ -0,0 +1,23 @@ +package kafka.log; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; + +public class SegmentStatusHandler { + + public static SegmentStatus getStatus(File file) throws IOException { + if (!file.exists()) { + return SegmentStatus.UNKNOWN; + } else { + final byte[] data = Files.readAllBytes(file.toPath()); + return SegmentStatus.getStatus(ByteBuffer.wrap(data).getInt()); + } + } + + public static void setStatus(File file, SegmentStatus status) throws IOException { + Files.write(file.toPath(), ByteBuffer.allocate(Integer.BYTES).putInt(status.getStatus()).array()); + } + +} diff --git a/core/src/main/scala/kafka/log/package.html b/core/src/main/scala/kafka/log/package.html deleted file mode 100644 index ee2f72e..0000000 --- a/core/src/main/scala/kafka/log/package.html +++ /dev/null @@ -1,24 +0,0 @@ - -The log management system for Kafka. - -The entry point for this system is LogManager. LogManager is responsible for holding all the logs, and handing them out by topic/partition. It also handles the enforcement of the -flush policy and retention policies. - -The Log itself is made up of log segments. A log is a FileRecords that contains the data and an OffsetIndex that supports reads by offset on the log. \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 55265a6..9c45326 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -114,7 +114,7 @@ object Defaults { val LogIndexSizeMaxBytes = 10 * 1024 * 1024 val LogIndexIntervalBytes = 4096 val LogFlushIntervalMessages = Long.MaxValue - val LogDeleteDelayMs = 60000 + val LogDeleteDelayMs = 3000 val LogFlushSchedulerIntervalMs = Long.MaxValue val LogFlushOffsetCheckpointIntervalMs = 60000 val LogFlushStartOffsetCheckpointIntervalMs = 60000 diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 54f8582..1a6202a 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -52,7 +52,8 @@ object StressTestLog { maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, brokerTopicStats = new BrokerTopicStats, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) val writer = new WriterThread(log) writer.start() val reader = new ReaderThread(log) diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index b385a2a..de6a19d 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -208,7 +208,8 @@ object TestLinearWriteSpeed { class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable { Utils.delete(dir) val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM, 60 * 60 * 1000, - LogManager.ProducerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10)) + LogManager.ProducerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10), + isFuture = false) def write(): Int = { log.appendAsLeader(messages, leaderEpoch = 0) messages.sizeInBytes diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala index b3d4468..0e81232 100644 --- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala @@ -52,7 +52,8 @@ class ReplicaTest { time = time, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) replica = new Replica(brokerId = 0, topicPartition = new TopicPartition("foo", 0), diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index 0ad5b46..87534a1 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -102,7 +102,8 @@ abstract class AbstractLogCleanerIntegrationTest { brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) logMap.put(partition, log) this.logs += log } diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 1cf393e..ff027d6 100644 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -58,7 +58,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) /* append two messages */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 7455763..3087701 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -289,7 +289,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) log } @@ -297,7 +298,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) private def records(key: Int, value: Int, timestamp: Long) = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes)) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index b351311..b5e842a 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1379,7 +1379,8 @@ class LogCleanerTest extends JUnitSuite { Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) private def makeCleaner(capacity: Int, checkDone: TopicPartition => Unit = _ => (), maxMessageSize: Int = 64*1024) = new Cleaner(id = 0, diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 8976c68..6635660 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -45,7 +45,7 @@ class LogSegmentTest { val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset)) val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs, - maxSegmentBytes = Int.MaxValue, time) + maxSegmentBytes = Int.MaxValue, time,null) segments += seg seg } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 9a9bc61..2189c65 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -66,7 +66,7 @@ class LogTest { } def createEmptyLogs(dir: File, offsets: Int*) { - for(offset <- offsets) { + for (offset <- offsets) { Log.logFile(dir, offset).createNewFile() Log.offsetIndexFile(dir, offset).createNewFile() } @@ -96,6 +96,7 @@ class LogTest { @Test def testTimeBasedLogRoll() { def createRecords = TestUtils.singletonRecords("test".getBytes) + val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L) // create a log @@ -117,7 +118,9 @@ class LogTest { // Append a message with timestamp to a segment whose first message do not have a timestamp. val timestamp = mockTime.milliseconds + log.config.segmentMs + 1 + def createRecordsWithTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = timestamp) + log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0) assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments) @@ -278,7 +281,7 @@ class LogTest { override def addSegment(segment: LogSegment): LogSegment = { val wrapper = new LogSegment(segment.log, segment.offsetIndex, segment.timeIndex, segment.txnIndex, segment.baseOffset, - segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) { + segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime, null) { override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long, minOneMessage: Boolean): FetchDataInfo = { @@ -564,6 +567,7 @@ class LogTest { val filtered = ByteBuffer.allocate(2048) records.filterTo(new TopicPartition("foo", 0), new RecordFilter { override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY + override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() @@ -605,6 +609,7 @@ class LogTest { val filtered = ByteBuffer.allocate(2048) records.filterTo(new TopicPartition("foo", 0), new RecordFilter { override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.RETAIN_EMPTY + override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = false }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() @@ -648,6 +653,7 @@ class LogTest { val filtered = ByteBuffer.allocate(2048) records.filterTo(new TopicPartition("foo", 0), new RecordFilter { override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY + override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() @@ -962,12 +968,14 @@ class LogTest { log.appendAsLeader(record, leaderEpoch = 0) seq = seq + 1 } + // Append an entry with multiple log records. def createRecords = TestUtils.records(List( new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes) ), producerId = pid, producerEpoch = epoch, sequence = seq) + val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0) assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset.get + 1, 3) @@ -1015,6 +1023,7 @@ class LogTest { // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry. def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = seq) + val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0) val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0) assertEquals("Inserted a duplicate records into the log", origAppendInfo.firstOffset.get, newAppendInfo.firstOffset.get) @@ -1022,7 +1031,7 @@ class LogTest { } @Test - def testMultipleProducerIdsPerMemoryRecord() : Unit = { + def testMultipleProducerIdsPerMemoryRecord(): Unit = { // create a log val log = createLog(logDir, LogConfig()) @@ -1068,7 +1077,7 @@ class LogTest { } @Test - def testDuplicateAppendToFollower() : Unit = { + def testDuplicateAppendToFollower(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) val log = createLog(logDir, logConfig) val epoch: Short = 0 @@ -1089,7 +1098,7 @@ class LogTest { } @Test - def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = { + def testMultipleProducersWithDuplicatesInSingleAppend(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) val log = createLog(logDir, logConfig) @@ -1185,6 +1194,7 @@ class LogTest { @Test def testSizeBasedLogRoll() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val setSize = createRecords.sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages @@ -1218,11 +1228,11 @@ class LogTest { val log = createLog(logDir, logConfig) val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray - for(value <- values) + for (value <- values) log.appendAsLeader(TestUtils.singletonRecords(value = value), leaderEpoch = 0) - for(i <- values.indices) { - val read = log.readUncommitted(i, 100, Some(i+1)).records.batches.iterator.next() + for (i <- values.indices) { + val read = log.readUncommitted(i, 100, Some(i + 1)).records.batches.iterator.next() assertEquals("Offset read should match order appended.", i, read.lastOffset) val actual = read.iterator.next() assertNull("Key should be null", actual.key) @@ -1244,9 +1254,9 @@ class LogTest { val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) // now test the case that we give the offsets and use non-sequential offsets - for(i <- records.indices) + for (i <- records.indices) log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i))) - for(i <- 50 until messageIds.max) { + for (i <- 50 until messageIds.max) { val idx = messageIds.indexWhere(_ >= i) val read = log.readUncommitted(i, 100, None).records.records.iterator.next() assertEquals("Offset read should match message id.", messageIds(idx), read.offset) @@ -1266,7 +1276,7 @@ class LogTest { val log = createLog(logDir, logConfig) // keep appending until we have two segments with only a single message in the second segment - while(log.numberOfSegments == 1) + while (log.numberOfSegments == 1) log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0) // now manually truncate off all but one message from the first segment to create a gap in the messages @@ -1279,7 +1289,7 @@ class LogTest { @Test(expected = classOf[KafkaStorageException]) def testLogRollAfterLogHandlerClosed() { val logConfig = LogTest.createLogConfig() - val log = createLog(logDir, logConfig) + val log = createLog(logDir, logConfig) log.closeHandlers() log.roll(1) } @@ -1287,7 +1297,7 @@ class LogTest { @Test def testReadWithMinMessage() { val logConfig = LogTest.createLogConfig(segmentBytes = 72) - val log = createLog(logDir, logConfig) + val log = createLog(logDir, logConfig) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -1314,7 +1324,7 @@ class LogTest { @Test def testReadWithTooSmallMaxLength() { val logConfig = LogTest.createLogConfig(segmentBytes = 72) - val log = createLog(logDir, logConfig) + val log = createLog(logDir, logConfig) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -1383,14 +1393,14 @@ class LogTest { val log = createLog(logDir, logConfig) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes, - timestamp = mockTime.milliseconds)) + timestamp = mockTime.milliseconds)) messageSets.foreach(log.appendAsLeader(_, leaderEpoch = 0)) log.flush() /* do successive reads to ensure all our messages are there */ var offset = 0L - for(i <- 0 until numMessages) { - val messages = log.readUncommitted(offset, 1024*1024).records.batches + for (i <- 0 until numMessages) { + val messages = log.readUncommitted(offset, 1024 * 1024).records.batches val head = messages.iterator.next() assertEquals("Offsets not equal", offset, head.lastOffset) @@ -1401,12 +1411,12 @@ class LogTest { assertEquals(s"Timestamps not equal at offset $offset", expected.timestamp, actual.timestamp) offset = head.lastOffset + 1 } - val lastRead = log.readUncommitted(startOffset = numMessages, maxLength = 1024*1024, + val lastRead = log.readUncommitted(startOffset = numMessages, maxLength = 1024 * 1024, maxOffset = Some(numMessages + 1)).records assertEquals("Should be no more messages", 0, lastRead.records.asScala.size) // check that rolling the log forced a flushed, the flush is async so retry in case of failure - TestUtils.retry(1000L){ + TestUtils.retry(1000L) { assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) } } @@ -1438,12 +1448,12 @@ class LogTest { */ @Test def testThatGarbageCollectingSegmentsDoesntChangeOffset() { - for(messagesToAppend <- List(0, 1, 25)) { + for (messagesToAppend <- List(0, 1, 25)) { logDir.mkdirs() // first test a log segment starting at 0 val logConfig = LogTest.createLogConfig(segmentBytes = 100, retentionMs = 0) val log = createLog(logDir, logConfig) - for(i <- 0 until messagesToAppend) + for (i <- 0 until messagesToAppend) log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0) val currOffset = log.logEndOffset @@ -1458,8 +1468,8 @@ class LogTest { assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments()) assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", - currOffset, - log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0).firstOffset.get) + currOffset, + log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0).firstOffset.get) // cleanup the log log.delete() @@ -1467,7 +1477,7 @@ class LogTest { } /** - * MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by + * MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by * appending a message set larger than the config.segmentSize setting and checking that an exception is thrown. */ @Test @@ -1554,6 +1564,7 @@ class LogTest { case _: RecordTooLargeException => // this is good } } + /** * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. */ @@ -1565,7 +1576,7 @@ class LogTest { val indexInterval = 3 * messageSize val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = indexInterval, segmentIndexBytes = 4096) var log = createLog(logDir, logConfig) - for(i <- 0 until numMessages) + for (i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) @@ -1574,7 +1585,7 @@ class LogTest { val lastOffset = log.logEndOffset // After segment is closed, the last entry in the time index should be (largest timestamp -> last offset). val lastTimeIndexOffset = log.logEndOffset - 1 - val lastTimeIndexTimestamp = log.activeSegment.largestTimestamp + val lastTimeIndexTimestamp = log.activeSegment.largestTimestamp // Depending on when the last time index entry is inserted, an entry may or may not be inserted into the time index. val numTimeIndexEntries = log.activeSegment.timeIndex.entries + { if (log.activeSegment.timeIndex.lastEntry.offset == log.logEndOffset - 1) 0 else 1 @@ -1629,7 +1640,7 @@ class LogTest { val numMessages = 200 val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) var log = createLog(logDir, logConfig) - for(i <- 0 until numMessages) + for (i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) val indexFiles = log.logSegments.map(_.offsetIndex.file) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) @@ -1644,7 +1655,7 @@ class LogTest { assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) assertTrue("The index should have been rebuilt", log.logSegments.head.offsetIndex.entries > 0) assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) - for(i <- 0 until numMessages) { + for (i <- 0 until numMessages) { assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset) if (i == 0) assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset) @@ -1689,21 +1700,21 @@ class LogTest { val numMessages = 200 val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) var log = createLog(logDir, logConfig) - for(i <- 0 until numMessages) + for (i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) val indexFiles = log.logSegments.map(_.offsetIndex.file) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() // corrupt all the index files - for( file <- indexFiles) { + for (file <- indexFiles) { val bw = new BufferedWriter(new FileWriter(file)) bw.write(" ") bw.close() } // corrupt all the index files - for( file <- timeIndexFiles) { + for (file <- timeIndexFiles) { val bw = new BufferedWriter(new FileWriter(file)) bw.write(" ") bw.close() @@ -1712,7 +1723,7 @@ class LogTest { // reopen the log log = createLog(logDir, logConfig, recoveryPoint = 200L) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) - for(i <- 0 until numMessages) { + for (i <- 0 until numMessages) { assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset) if (i == 0) assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset) @@ -1728,9 +1739,10 @@ class LogTest { @Test def testTruncateTo() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val setSize = createRecords.sizeInBytes val msgPerSeg = 10 - val segmentSize = msgPerSeg * setSize // each segment will be 10 messages + val segmentSize = msgPerSeg * setSize // each segment will be 10 messages // create a log val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize) @@ -1751,8 +1763,8 @@ class LogTest { log.truncateTo(log.logEndOffset + 1) // try to truncate beyond lastOffset assertEquals("Should not change offset but should log error", lastOffset, log.logEndOffset) assertEquals("Should not change log size", size, log.size) - log.truncateTo(msgPerSeg/2) // truncate somewhere in between - assertEquals("Should change offset", log.logEndOffset, msgPerSeg/2) + log.truncateTo(msgPerSeg / 2) // truncate somewhere in between + assertEquals("Should change offset", log.logEndOffset, msgPerSeg / 2) assertTrue("Should change log size", log.size < size) log.truncateTo(0) // truncate the entire log assertEquals("Should change offset", 0, log.logEndOffset) @@ -1784,17 +1796,17 @@ class LogTest { def testIndexResizingAtTruncation() { val setSize = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds).sizeInBytes val msgPerSeg = 10 - val segmentSize = msgPerSeg * setSize // each segment will be 10 messages + val segmentSize = msgPerSeg * setSize // each segment will be 10 messages val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = setSize - 1) val log = createLog(logDir, logConfig) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) - for (i<- 1 to msgPerSeg) + for (i <- 1 to msgPerSeg) log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) mockTime.sleep(msgPerSeg) - for (i<- 1 to msgPerSeg) + for (i <- 1 to msgPerSeg) log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments) val expectedEntries = msgPerSeg - 1 @@ -1804,11 +1816,11 @@ class LogTest { log.truncateTo(0) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) - assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.offsetIndex.maxEntries) - assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/12, log.logSegments.toList.head.timeIndex.maxEntries) + assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize / 8, log.logSegments.toList.head.offsetIndex.maxEntries) + assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize / 12, log.logSegments.toList.head.timeIndex.maxEntries) mockTime.sleep(msgPerSeg) - for (i<- 1 to msgPerSeg) + for (i <- 1 to msgPerSeg) log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) } @@ -1824,6 +1836,7 @@ class LogTest { val bogusTimeIndex2 = Log.timeIndexFile(logDir, 5) def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) @@ -1845,6 +1858,7 @@ class LogTest { @Test def testReopenThenTruncate() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + // create a log val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000) var log = createLog(logDir, logConfig) @@ -1865,9 +1879,10 @@ class LogTest { @Test def testAsyncDelete() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000L) + val asyncDeleteMs = 1000 val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000, - retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs) + retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs) val log = createLog(logDir, logConfig) // append some messages to create some segments @@ -1883,9 +1898,9 @@ class LogTest { assertEquals("Only one segment should remain.", 1, log.numberOfSegments) assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && - segments.forall(_.offsetIndex.file.getName.endsWith(Log.DeletedFileSuffix))) + segments.forall(_.offsetIndex.file.getName.endsWith(Log.DeletedFileSuffix))) assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) && - segments.forall(_.offsetIndex.file.exists)) + segments.forall(_.offsetIndex.file.exists)) assertTrue("The original file should be gone.", oldFiles.forall(!_.exists)) // when enough time passes the files should be deleted @@ -1900,6 +1915,7 @@ class LogTest { @Test def testOpenDeletesObsoleteFiles() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) var log = createLog(logDir, logConfig) @@ -1932,8 +1948,8 @@ class LogTest { val buffer = ByteBuffer.allocate(512) for (offset <- appendOffsets) { val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(), - 1L, 0, 0, false, 0) + TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(), + 1L, 0, 0, false, 0) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() } @@ -1975,19 +1991,19 @@ class LogTest { val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) for (magic <- magicVals; compression <- compressionTypes) { val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes), - new SimpleRecord("k2".getBytes, "v2".getBytes), - new SimpleRecord("k3".getBytes, "v3".getBytes)), - magicValue = magic, codec = compression, - baseOffset = firstOffset) + new SimpleRecord("k2".getBytes, "v2".getBytes), + new SimpleRecord("k3".getBytes, "v3".getBytes)), + magicValue = magic, codec = compression, + baseOffset = firstOffset) withClue(s"Magic=$magic, compressionType=$compression") { val exception = intercept[UnexpectedAppendOffsetException] { log.appendAsFollower(records = batch) } assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#firstOffset", - firstOffset, exception.firstOffset) + firstOffset, exception.firstOffset) assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#lastOffset", - firstOffset + 2, exception.lastOffset) + firstOffset + 2, exception.lastOffset) } } } @@ -2003,7 +2019,9 @@ class LogTest { def testCorruptLog() { // append some messages to create some segments val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val recoveryPoint = 50L for (_ <- 0 until 10) { // create a log and write some messages to it @@ -2064,7 +2082,7 @@ class LogTest { val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index")) assertEquals(2, indexFiles.length) for (file <- indexFiles) { - val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong) + val offsetIndex = new OffsetIndex(file, file.getName.replace(".index", "").toLong) assertTrue(offsetIndex.lastOffset >= 0) offsetIndex.close() } @@ -2103,7 +2121,7 @@ class LogTest { val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index")) assertEquals(2, indexFiles.length) for (file <- indexFiles) { - val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong) + val offsetIndex = new OffsetIndex(file, file.getName.replace(".index", "").toLong) assertTrue(offsetIndex.lastOffset >= 0) offsetIndex.close() } @@ -2144,7 +2162,7 @@ class LogTest { val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index")) assertEquals(3, indexFiles.length) for (file <- indexFiles) { - val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong) + val offsetIndex = new OffsetIndex(file, file.getName.replace(".index", "").toLong) assertTrue(offsetIndex.lastOffset >= 0) offsetIndex.close() } @@ -2314,7 +2332,7 @@ class LogTest { // Simulate recovery right after all new segments have been renamed to .swap. On recovery, existing split operation // is completed and the old segment must be deleted. newSegments.reverse.foreach(segment => { - segment.changeFileSuffixes("", Log.SwapFileSuffix) + segment.changeFileSuffixes("", Log.SwapFileSuffix) }) for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) @@ -2381,6 +2399,7 @@ class LogTest { def testCleanShutdownFile() { // append some messages to create some segments val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val cleanShutdownFile = createCleanShutdownFile() @@ -2555,6 +2574,7 @@ class LogTest { @Test def testDeleteOldSegments() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) val log = createLog(logDir, logConfig) @@ -2605,6 +2625,7 @@ class LogTest { @Test def testLogDeletionAfterClose() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) val log = createLog(logDir, logConfig) @@ -2623,6 +2644,7 @@ class LogTest { @Test def testLogDeletionAfterDeleteRecords() { def createRecords = TestUtils.singletonRecords("test".getBytes) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5) val log = createLog(logDir, logConfig) @@ -2655,6 +2677,7 @@ class LogTest { @Test def shouldDeleteSizeBasedSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) val log = createLog(logDir, logConfig) @@ -2664,12 +2687,13 @@ class LogTest { log.onHighWatermarkIncremented(log.logEndOffset) log.deleteOldSegments() - assertEquals("should have 2 segments", 2,log.numberOfSegments) + assertEquals("should have 2 segments", 2, log.numberOfSegments) } @Test def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() { def createRecords = TestUtils.singletonRecords("test".getBytes) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15) val log = createLog(logDir, logConfig) @@ -2679,12 +2703,13 @@ class LogTest { log.onHighWatermarkIncremented(log.logEndOffset) log.deleteOldSegments() - assertEquals("should have 3 segments", 3,log.numberOfSegments) + assertEquals("should have 3 segments", 3, log.numberOfSegments) } @Test def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() { def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000) val log = createLog(logDir, logConfig) @@ -2700,6 +2725,7 @@ class LogTest { @Test def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() { def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000) val log = createLog(logDir, logConfig) @@ -2715,6 +2741,7 @@ class LogTest { @Test def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() { def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact") val log = createLog(logDir, logConfig) @@ -2734,6 +2761,7 @@ class LogTest { @Test def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() { def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete") val log = createLog(logDir, logConfig) @@ -2764,7 +2792,7 @@ class LogTest { //Then leader epoch should be set on messages for (i <- records.indices) { - val read = log.readUncommitted(i, 100, Some(i+1)).records.batches.iterator.next() + val read = log.readUncommitted(i, 100, Some(i + 1)).records.batches.iterator.next() assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch) } } @@ -2777,7 +2805,7 @@ class LogTest { //Given each message has an offset & epoch, as msgs from leader would def recordsForEpoch(i: Int): MemoryRecords = { val recs = MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)) - recs.batches.asScala.foreach{record => + recs.batches.asScala.foreach { record => record.setPartitionLeaderEpoch(42) record.setLastOffset(i) } @@ -2796,6 +2824,7 @@ class LogTest { @Test def shouldTruncateLeaderEpochsWhenDeletingSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) val log = createLog(logDir, logConfig) val cache = epochCache(log) @@ -2821,6 +2850,7 @@ class LogTest { @Test def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) + val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) val log = createLog(logDir, logConfig) val cache = epochCache(log) @@ -2846,6 +2876,7 @@ class LogTest { @Test def shouldTruncateLeaderEpochFileWhenTruncatingLog() { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes) val log = createLog(logDir, logConfig) val cache = epochCache(log) @@ -3397,7 +3428,7 @@ class LogTest { assertEquals(new AbortedTransaction(pid, 0), fetchDataInfo.abortedTransactions.get.head) } - private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) + private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = { var sequence = 0 @@ -3555,11 +3586,13 @@ object LogTest { time = time, maxProducerIdExpirationMs = maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) } /** * Check if the given log contains any segment with records that cause offset overflow. + * * @param log Log to check * @return true if log contains at least one segment with offset overflow; false otherwise */ diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 01f0010..4a447f0 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -39,7 +39,8 @@ class DumpLogSegmentsTest { val log = Log(logDir, LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) /* append two messages */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 0,