From e4a86709d07030c44f077ab20d4329ddb84c4aec Mon Sep 17 00:00:00 2001 From: Anton Karamanov Date: Tue, 15 Jul 2014 21:42:15 +0400 Subject: [PATCH] KAFKA-1414 Speedup broker startup after hard reset and shutdown; patched by Jay Kreps, Alexey Ozeritskiy, Dmitry Bugaychenko and Anton Karamanov --- config/server.properties | 6 + core/src/main/scala/kafka/log/LogConfig.scala | 30 +++- core/src/main/scala/kafka/log/LogManager.scala | 164 +++++++++++++++------ core/src/main/scala/kafka/server/KafkaConfig.scala | 6 + core/src/main/scala/kafka/server/KafkaServer.scala | 4 +- 5 files changed, 157 insertions(+), 53 deletions(-) diff --git a/config/server.properties b/config/server.properties index c9e923a..921657a 100644 --- a/config/server.properties +++ b/config/server.properties @@ -62,6 +62,12 @@ log.dirs=/tmp/kafka-logs # the brokers. num.partitions=2 +# The number of threads to be used when recovering unflushed segments of logs +#log.recovery.threads=1 + +# The number of threads to be used when shutting down +#log.shutdown.threads=1 + ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5746ad4..af4845b 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -36,6 +36,8 @@ object Defaults { val MinCleanableDirtyRatio = 0.5 val Compact = false val UncleanLeaderElectionEnable = true + val RecoveryThreads = 1 + val ShutdownThreads = 1 } /** @@ -54,6 +56,8 @@ object Defaults { * @param compact Should old segments in this log be deleted or deduplicated? * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property * but included here for topic-specific configuration validation purposes + * @param recoveryThreads The number of threads to be used when recovering unflushed segments + * @param shutdownThreads The number of threads to be used during shutdown */ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val segmentMs: Long = Defaults.SegmentMs, @@ -68,8 +72,10 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { - + val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + val recoveryThreads: Int = Defaults.RecoveryThreads, + val shutdownThreads: Int = Defaults.ShutdownThreads) { + def toProps: Properties = { val props = new Properties() import LogConfig._ @@ -87,9 +93,11 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) + props.put(RecoveryThreads, recoveryThreads.toString) + props.put(ShutdownThreads, shutdownThreads.toString) props } - + } object LogConfig { @@ -107,6 +115,8 @@ object LogConfig { val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" + val RecoveryThreads = "recovery.threads" + val ShutdownThreads = "shutdown.threads" val ConfigNames = Set(SegmentBytesProp, SegmentMsProp, @@ -121,9 +131,11 @@ object LogConfig { DeleteRetentionMsProp, MinCleanableDirtyRatioProp, CleanupPolicyProp, - UncleanLeaderElectionEnableProp) - - + UncleanLeaderElectionEnableProp, + RecoveryThreads, + ShutdownThreads) + + /** * Parse the given properties instance into a LogConfig object */ @@ -144,9 +156,11 @@ object LogConfig { compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") .trim.toLowerCase != "delete", uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, - Defaults.UncleanLeaderElectionEnable.toString).toBoolean) + Defaults.UncleanLeaderElectionEnable.toString).toBoolean, + recoveryThreads = props.getProperty(RecoveryThreads, Defaults.RecoveryThreads.toString).toInt, + shutdownThreads = props.getProperty(ShutdownThreads, Defaults.ShutdownThreads.toString).toInt) } - + /** * Create a log config instance using the given properties and defaults */ diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 1946c94..98abb92 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,6 +23,7 @@ import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint} +import java.util.concurrent.{Executors, ExecutorService, ExecutionException} /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -101,35 +102,60 @@ class LogManager(val logDirs: Array[File], /** * Recover and load all logs in the given data directories */ - private def loadLogs(dirs: Seq[File]) { - for(dir <- dirs) { - val recoveryPoints = this.recoveryPointCheckpoints(dir).read - /* load the logs */ - val subDirs = dir.listFiles() - if(subDirs != null) { - val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) - if(cleanShutDownFile.exists()) - info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) - else - brokerState.newState(RecoveringFromUncleanShutdown) - - for(dir <- subDirs) { - if(dir.isDirectory) { - info("Loading log '" + dir.getName + "'") - val topicPartition = Log.parseTopicPartitionName(dir.getName) - val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) - val log = new Log(dir, - config, - recoveryPoints.getOrElse(topicPartition, 0L), - scheduler, - time) - val previous = this.logs.put(topicPartition, log) - if(previous != null) - throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) - } + private def loadLogs(dirs: Seq[File]): Unit = { + var pool: ExecutorService = null + + try { + val jobs = dirs.map { dir => + Utils.runnable { () => + loadDir(dir) } - cleanShutDownFile.delete() } + val poolSize = defaultConfig.recoveryThreads + pool = Executors.newFixedThreadPool(poolSize) + jobs.map(pool.submit).foreach(_.get) + } catch { + case e: ExecutionException => { + error("There was an error in one of the threads during logs loading: " + e.getCause) + throw e.getCause + } + } finally { + pool.shutdown() + } + } + + private def loadDir(dir: File): Unit = { + val recoveryPoints = this.recoveryPointCheckpoints(dir).read + /* load the logs */ + val subDirs = dir.listFiles() + if (subDirs != null) { + val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) + if (cleanShutDownFile.exists()) { + info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) + } else { + brokerState.newState(RecoveringFromUncleanShutdown) + } + + for (dir <- subDirs if dir.isDirectory) { + info("Loading log '" + dir.getName + "'") + val topicPartition = Log.parseTopicPartitionName(dir.getName) + val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) + val log = new Log(dir, + config, + recoveryPoints.getOrElse(topicPartition, 0L), + scheduler, + time) + val previous = addLogWithLock(topicPartition, log) + if(previous != null) + throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + } + cleanShutDownFile.delete() + } + } + + private def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { + logCreationOrDeletionLock synchronized { + this.logs.put(topicPartition, log) } } @@ -160,31 +186,67 @@ class LogManager(val logDirs: Array[File], if(cleanerConfig.enableCleaner) cleaner.startup() } - + /** * Close all the logs */ def shutdown() { info("Shutting down.") + try { // stop the cleaner first - if(cleaner != null) + if (cleaner != null) { Utils.swallow(cleaner.shutdown()) - // flush the logs to ensure latest possible recovery point - allLogs.foreach(_.flush()) - // close the logs - allLogs.foreach(_.close()) - // update the last flush point - checkpointRecoveryPointOffsets() - // mark that the shutdown was clean by creating the clean shutdown marker file - logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())) + } + + var pool: ExecutorService = null + + // close logs in each dir + try { + val jobs = this.logDirs.map { dir => + Utils.runnable { () => + val logs = logsByDir.getOrElse(dir.toString, Map()).values + + // flush the logs to ensure latest possible recovery point + info("Flushing logs at " + dir) + logs.foreach(_.flush()) + + // close the logs + info("Closing logs at " + dir) + logs.foreach(_.close()) + + // update the last flush point + info("Updating recovery points " + dir) + + // checkpoint recovery point offsets of all logs in current dir + checkpointLogsInDir(dir) + + // mark that the shutdown was clean by creating the clean shutdown marker file + info("Writing clean shutdown marker " + dir) + Utils.swallow(new File(dir.toString, Log.CleanShutdownFile).createNewFile()) + } + } + + val poolSize = defaultConfig.shutdownThreads + pool = Executors.newFixedThreadPool(poolSize) + jobs.map(pool.submit).foreach(_.get) + } catch { + case e: ExecutionException => { + error("There was an error in one of the threads during logs loading: " + e.getCause) + throw e.getCause + } + } finally { + pool.shutdown() + } } finally { // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) } + info("Shutdown complete.") } + /** * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset * @@ -230,12 +292,17 @@ class LogManager(val logDirs: Array[File], * to avoid recovering the whole log on startup. */ def checkpointRecoveryPointOffsets() { - val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString) - for(dir <- logDirs) { - val recoveryPoints = recoveryPointsByDir.get(dir.toString) - if(recoveryPoints.isDefined) - this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) - } + this.logDirs.foreach(checkpointLogsInDir(_)) + } + + /** + * Make a checkpoint for all logs in provided directory. + */ + def checkpointLogsInDir(dir: File): Unit = { + for { + recoveryPoints <- this.logsByDir.get(dir.toString) + recoveryCheckpoints <- this.recoveryPointCheckpoints.get(dir) + } recoveryCheckpoints.write(recoveryPoints.mapValues(_.recoveryPoint)) } /** @@ -366,13 +433,22 @@ class LogManager(val logDirs: Array[File], * Get all the partition logs */ def allLogs(): Iterable[Log] = logs.values - + /** * Get a map of TopicAndPartition => Log */ def logsByTopicPartition = logs.toMap /** + * Map of log dir to logs by topic and partitions in that dir + */ + private def logsByDir = { + this.logsByTopicPartition.groupBy { + case (_, log) => log.dir.getParent + } + } + + /** * Flush any log which has exceeded its flush interval and has unwritten messages. */ private def flushDirtyLogs() = { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index bb2e654..801033a 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -184,6 +184,12 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */ val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) + /* the number of threads to be used when recovering unflushed segments */ + val logRecoveryThreads = props.getIntInRange("log.recovery.threads", 1, (1, Int.MaxValue)) + + /* the number of threads to be used during shutdown */ + val logShutdownThreads = props.getIntInRange("log.recovery.threads", 1, (1, Int.MaxValue)) + /* enable auto creation of topic on the server */ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5a56f57..73b032b 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -286,7 +286,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg deleteRetentionMs = config.logCleanerDeleteRetentionMs, fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, - compact = config.logCleanupPolicy.trim.toLowerCase == "compact") + compact = config.logCleanupPolicy.trim.toLowerCase == "compact", + recoveryThreads = config.logRecoveryThreads, + shutdownThreads = config.logShutdownThreads) val defaultProps = defaultLogConfig.toProps val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper -- 2.0.1