diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5746ad4..1ed73ba 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -36,6 +36,7 @@ object Defaults { val MinCleanableDirtyRatio = 0.5 val Compact = false val UncleanLeaderElectionEnable = true + val RecoveryThreads = 1 } /** @@ -54,6 +55,7 @@ object Defaults { * @param compact Should old segments in this log be deleted or deduplicated? * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property * but included here for topic-specific configuration validation purposes + * @param recoveryThreads The number of threads to be used when recovering unflushed segments */ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val segmentMs: Long = Defaults.SegmentMs, @@ -68,8 +70,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { - + val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + val recoveryThreads: Int = Defaults.RecoveryThreads) { + def toProps: Properties = { val props = new Properties() import LogConfig._ @@ -87,9 +90,10 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) + props.put(RecoveryThreads, recoveryThreads.toString) props } - + } object LogConfig { @@ -107,13 +111,14 @@ object LogConfig { val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" - - val ConfigNames = Set(SegmentBytesProp, - SegmentMsProp, - SegmentIndexBytesProp, - FlushMessagesProp, - FlushMsProp, - RetentionBytesProp, + val RecoveryThreads = "recovery.threads" + + val ConfigNames = Set(SegmentBytesProp, + SegmentMsProp, + SegmentIndexBytesProp, + FlushMessagesProp, + FlushMsProp, + RetentionBytesProp, RententionMsProp, MaxMessageBytesProp, IndexIntervalBytesProp, @@ -121,9 +126,10 @@ object LogConfig { DeleteRetentionMsProp, MinCleanableDirtyRatioProp, CleanupPolicyProp, - UncleanLeaderElectionEnableProp) - - + UncleanLeaderElectionEnableProp, + RecoveryThreads) + + /** * Parse the given properties instance into a LogConfig object */ @@ -144,9 +150,10 @@ object LogConfig { compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") .trim.toLowerCase != "delete", uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, - Defaults.UncleanLeaderElectionEnable.toString).toBoolean) + Defaults.UncleanLeaderElectionEnable.toString).toBoolean, + recoveryThreads = props.getProperty(RecoveryThreads, Defaults.RecoveryThreads.toString).toInt) } - + /** * Create a log config instance using the given properties and defaults */ diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 1946c94..89792b9 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -92,44 +92,72 @@ class LogManager(val logDirs: Array[File], dirs.map { dir => val lock = new FileLock(new File(dir, LockFile)) if(!lock.tryLock()) - throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + + throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + ". A Kafka instance in another process or thread is using this directory.") lock } } - + /** * Recover and load all logs in the given data directories */ - private def loadLogs(dirs: Seq[File]) { - for(dir <- dirs) { - val recoveryPoints = this.recoveryPointCheckpoints(dir).read - /* load the logs */ - val subDirs = dir.listFiles() - if(subDirs != null) { - val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) - if(cleanShutDownFile.exists()) - info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) - else - brokerState.newState(RecoveringFromUncleanShutdown) + private def loadLogs(dirs: Seq[File]): Unit = { + import java.util.concurrent.{Executors, ExecutorService, ExecutionException} + //import scala.collection.JavaConversions._ - for(dir <- subDirs) { - if(dir.isDirectory) { - info("Loading log '" + dir.getName + "'") - val topicPartition = Log.parseTopicPartitionName(dir.getName) - val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) - val log = new Log(dir, - config, - recoveryPoints.getOrElse(topicPartition, 0L), - scheduler, - time) - val previous = this.logs.put(topicPartition, log) - if(previous != null) - throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) - } + var pool: ExecutorService = null + + try { + val jobs = dirs.map { dir => + Utils.runnable { () => + loadDir(dir) } - cleanShutDownFile.delete() } + val poolSize = dirs.size + pool = Executors.newFixedThreadPool(poolSize) + jobs.map(pool.submit).foreach(_.get) + } catch { + case e: ExecutionException => { + error("There was an error in one of the threads during logs loading: " + e.getCause) + throw e.getCause + } + } finally { + pool.shutdown() + } + } + + private def loadDir(dir: File): Unit = { + val recoveryPoints = this.recoveryPointCheckpoints(dir).read + /* load the logs */ + val subDirs = dir.listFiles() + if (subDirs != null) { + val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) + if (cleanShutDownFile.exists()) { + info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) + } else { + brokerState.newState(RecoveringFromUncleanShutdown) + } + + for (dir <- subDirs if dir.isDirectory) { + info("Loading log '" + dir.getName + "'") + val topicPartition = Log.parseTopicPartitionName(dir.getName) + val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) + val log = new Log(dir, + config, + recoveryPoints.getOrElse(topicPartition, 0L), + scheduler, + time) + val previous = addLogWithLock(topicPartition, log) + if(previous != null) + throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + } + cleanShutDownFile.delete() + } + } + + private def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { + logCreationOrDeletionLock synchronized { + this.logs.put(topicPartition, log) } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index bb2e654..bc30f81 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -184,6 +184,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */ val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) + /* the number of threads to be used when recovering unflusshed segments */ + val logRecoveryThreads = props.getIntInRange("log.recovery.threads", 1, (1, Int.MaxValue)) + /* enable auto creation of topic on the server */ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)