From ef57924a3c7b0b4dff22be1ee9730e8cabc42ff3 Mon Sep 17 00:00:00 2001 From: Alexey Ozeritsky Date: Fri, 25 Apr 2014 16:21:02 +0400 Subject: [PATCH] * parallel dir loading --- core/src/main/scala/kafka/log/LogManager.scala | 54 +++++++++++++++----------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7cee543..cf618e0 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -102,31 +102,39 @@ class LogManager(val logDirs: Array[File], * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { - for(dir <- dirs) { - val recoveryPoints = this.recoveryPointCheckpoints(dir).read - /* load the logs */ - val subDirs = dir.listFiles() - if(subDirs != null) { - val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) - if(cleanShutDownFile.exists()) - info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) - for(dir <- subDirs) { - if(dir.isDirectory) { - info("Loading log '" + dir.getName + "'") - val topicPartition = Log.parseTopicPartitionName(dir.getName) - val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) - val log = new Log(dir, - config, - recoveryPoints.getOrElse(topicPartition, 0L), - scheduler, - time) - val previous = this.logs.put(topicPartition, log) - if(previous != null) - throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) - } + dirs.toParArray.foreach(dir => loadDir(dir)) + } + + private def loadDir(dir: File) { + val recoveryPoints = this.recoveryPointCheckpoints(dir).read + /* load the logs */ + val subDirs = dir.listFiles() + if(subDirs != null) { + val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) + if(cleanShutDownFile.exists()) + info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) + for(dir <- subDirs) { + if(dir.isDirectory) { + info("Loading log '" + dir.getName + "'") + val topicPartition = Log.parseTopicPartitionName(dir.getName) + val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) + val log = new Log(dir, + config, + recoveryPoints.getOrElse(topicPartition, 0L), + scheduler, + time) + val previous = addLogWithLock(topicPartition, log) + if(previous != null) + throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } - cleanShutDownFile.delete() } + cleanShutDownFile.delete() + } + } + + private def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { + logCreationOrDeletionLock synchronized { + this.logs.put(topicPartition, log) } } -- 1.9.3