core/src/main/scala/kafka/log/Log.scala private def loadSegments(): Long = { // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapDirs = removeTempFilesAndCollectSwapFiles() // Now do a second pass and load all the log and index files. // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When // this happens, restart loading segment files from scratch. retryOnOffsetOverflow { // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry // loading of segments. In that case, we also need to close all segments that could have been left open in previous // call to loadSegmentFiles(). logSegments.foreach(_.close()) segments.clear() loadSegmentFiles() } // Finally, complete any interrupted swap operations. To be crash-safe, // log files that are replaced by the swap segment should be renamed to .deleted // before the swap file is restored as the new segment file. completeSwapOperations(swapDirs) if (logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 addSegment(LogSegment.open(segDir = LogSegment.getSegmentDir(dir, logStartOffset), baseOffset = 0, config, time = time, fileAlreadyExists = false, initFileSize = this.initFileSize, preallocate = config.preallocate)) 0 - } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { + } else if (!(LogSegment.getStatus(dir) == SegmentStatus.DELETED)) { val nextOffset = retryOnOffsetOverflow { recoverLog() } deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") } // def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) + def isFuture: Boolean = LogSegment.getStatus(dir) == SegmentStatus.Future def apply(dir: File, config: LogConfig, logStartOffset: Long, recoveryPoint: Long, scheduler: Scheduler, brokerTopicStats: BrokerTopicStats, time: Time = Time.SYSTEM, maxProducerIdExpirationMs: Int, producerIdExpirationCheckIntervalMs: Int, - logDirFailureChannel: LogDirFailureChannel): Log = { + logDirFailureChannel: LogDirFailureChannel, + isFuture: Boolean): Log = { val topicPartition = Log.parseTopicPartitionName(dir) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) + if (isFuture) { + SegmentStatusHandler.setStatus(new File(dir, SegmentFile.STATUS.getName), SegmentStatus.Future) + } new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel) } core/src/main/scala/kafka/log/LogManager.scala private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]() + private val tpToBeDeleted = new CopyOnWriteArraySet[TopicPartition]() private def addLogToBeDeleted(log: Log): Unit = { this.logsToBeDeleted.add((log, time.milliseconds())) + this.tpToBeDeleted.add(log.topicPartition) } private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { debug("Loading log '" + logDir.getName + "'") val topicPartition = Log.parseTopicPartitionName(logDir) val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L) val log = Log( dir = logDir, config = config, logStartOffset = logStartOffset, recoveryPoint = logRecoveryPoint, maxProducerIdExpirationMs = maxPidExpirationMs, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, scheduler = scheduler, time = time, brokerTopicStats = brokerTopicStats, - logDirFailureChannel = logDirFailureChannel) + logDirFailureChannel = logDirFailureChannel + , isFuture = false) - if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { + if (LogSegment.getStatus(logDir) == SegmentStatus.DELETED) { addLogToBeDeleted(log) } else { val previous = { if (log.isFuture) this.futureLogs.put(topicPartition, log) else this.currentLogs.put(topicPartition, log) } if (previous != null) { if (log.isFuture) throw new IllegalStateException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) else throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " + s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " + s"replacing current replica with future replica. Recover broker from this failure by manually deleting one of the two directories " + s"for this partition. It is recommended to delete the partition in the log directory that is known to have failed recently.") } } } def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = { logCreationOrDeletionLock synchronized { getLog(topicPartition, isFuture).getOrElse { + while (tpToBeDeleted.contains(topicPartition)) { + Thread.sleep(500) + warn(s"Can not create log for $topicPartition because current topicPartition: $topicPartition is under deletion ") + } // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDir = { val preferredLogDir = preferredLogDirs.get(topicPartition) if (isFuture) { if (preferredLogDir == null) throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory") else if (getLog(topicPartition).get.dir.getParent == preferredLogDir) throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition") } if (preferredLogDir != null) preferredLogDir else nextLogDir().getAbsolutePath } if (!isLogDirOnline(logDir)) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline") try { - val dir = { - if (isFuture) - new File(logDir, Log.logFutureDirName(topicPartition)) - else - new File(logDir, Log.logDirName(topicPartition)) - } + val dir = new File(logDir, Log.logDirName(topicPartition)) Files.createDirectories(dir.toPath) val log = Log( dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, maxProducerIdExpirationMs = maxPidExpirationMs, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, scheduler = scheduler, time = time, brokerTopicStats = brokerTopicStats, - logDirFailureChannel = logDirFailureChannel) + logDirFailureChannel = logDirFailureChannel, + isFuture = isFuture) if (isFuture) futureLogs.put(topicPartition, log) else currentLogs.put(topicPartition, log) info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.") // Remove the preferred log dir since it has already been satisfied preferredLogDirs.remove(topicPartition) log } catch { case e: IOException => val msg = s"Error while creating log for $topicPartition in dir $logDir" logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) throw new KafkaStorageException(msg, e) } } } } private def deleteLogs(): Unit = { var nextDelayMs = 0L try { def nextDeleteDelayMs: Long = { if (!logsToBeDeleted.isEmpty) { val (_, scheduleTimeMs) = logsToBeDeleted.peek() scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds() } else currentDefaultConfig.fileDeleteDelayMs } while ( { nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0 }) { val (removedLog, _) = logsToBeDeleted.take() if (removedLog != null) { try { removedLog.delete() + tpToBeDeleted.remove(removedLog.topicPartition) info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") } catch { case e: KafkaStorageException => error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e) } } } } catch { case e: Throwable => error(s"Exception in kafka-delete-logs thread.", e) } finally { try { scheduler.schedule("kafka-delete-logs", deleteLogs _, delay = nextDelayMs, unit = TimeUnit.MILLISECONDS) } catch { case e: Throwable => if (scheduler.isStarted) { // No errors should occur unless scheduler has been shutdown error(s"Failed to schedule next delete in kafka-delete-logs thread", e) } } } } def replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit = { logCreationOrDeletionLock synchronized { val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(Log.logDirName(topicPartition)) + LogSegment.setStatus(destLog.dir, SegmentStatus.HOT) // Now that future replica has been successfully renamed to be the current replica // Update the cached map and log cleaner as appropriate. futureLogs.remove(topicPartition) currentLogs.put(topicPartition, destLog) if (cleaner != null) { cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile) cleaner.resumeCleaning(topicPartition) } try { - sourceLog.renameDir(Log.logDeleteDirName(topicPartition)) + LogSegment.setStatus(sourceLog.dir, SegmentStatus.DELETED) // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted sourceLog.close() checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile) checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) addLogToBeDeleted(sourceLog) } catch { case e: KafkaStorageException => // If sourceLog's log directory is offline, we need close its handlers here. // handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map sourceLog.closeHandlers() sourceLog.removeLogMetrics() throw e } info(s"The current replica is successfully replaced with the future replica for $topicPartition") } } def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = { val removedLog: Log = logCreationOrDeletionLock synchronized { if (isFuture) futureLogs.remove(topicPartition) else currentLogs.remove(topicPartition) } if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. if (cleaner != null && !isFuture) { cleaner.abortCleaning(topicPartition) cleaner.updateCheckpoints(removedLog.dir.getParentFile) } - removedLog.renameDir(Log.logDeleteDirName(topicPartition)) + LogSegment.setStatus(removedLog.dir, SegmentStatus.DELETED) removedLog.close() checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile) checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) addLogToBeDeleted(removedLog) info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") } else if (offlineLogDirs.nonEmpty) { throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(",")) } removedLog } core/src/main/scala/kafka/log/SegmentStatus.java package kafka.log; public enum SegmentStatus { - HOT(1), DELETED(2), CLEANED(3), SWAP(4), UNKNOWN(5); + HOT(1), DELETED(2), CLEANED(3), SWAP(4), UNKNOWN(5), Future(6); private final int status; SegmentStatus(int status) { this.status = status; } public int getStatus() { return status; } public static SegmentStatus getStatus(int id) { switch (id) { case 1: return HOT; case 2: return DELETED; case 3: return CLEANED; case 4: return SWAP; case 5: return UNKNOWN; + case 6: + return Future; default: throw new RuntimeException("Invalid file status : " + id); } } } core/src/test/scala/other/kafka/StressTestLog.scala maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, brokerTopicStats = new BrokerTopicStats, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) val writer = new WriterThread(log) writer.start() val reader = new ReaderThread(log) core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable { Utils.delete(dir) val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM, 60 * 60 * 1000, - LogManager.ProducerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10)) + LogManager.ProducerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10), + isFuture = false) def write(): Int = { log.appendAsLeader(messages, leaderEpoch = 0) messages.sizeInBytes core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala time = time, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) replica = new Replica(brokerId = 0, topicPartition = new TopicPartition("foo", 0), core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) logMap.put(partition, log) this.logs += log } core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) /* append two messages */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0, core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) log } Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) core/src/test/scala/unit/kafka/log/LogCleanerTest.scala Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) core/src/test/scala/unit/kafka/log/LogTest.scala time = time, maxProducerIdExpirationMs = maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false) } core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala val log = Log(logDir, LogConfig(), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, - logDirFailureChannel = new LogDirFailureChannel(10)) + logDirFailureChannel = new LogDirFailureChannel(10), + isFuture = false)