diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 2a4d9539bab..b35ba214bf1 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -803,7 +803,7 @@ class LogManager(logDirs: Seq[File], if (preferredLogDir != null) List(new File(preferredLogDir)) else - nextLogDirs() + nextLogDirs(topicPartition) } val logDirName = { @@ -1068,12 +1068,14 @@ class LogManager(logDirs: Seq[File], * Currently this is done by calculating the number of partitions in each directory and then sorting the * data directories by fewest partitions. */ - private def nextLogDirs(): List[File] = { + private def nextLogDirs(topicPartition: TopicPartition): List[File] = { if(_liveLogDirs.size == 1) { List(_liveLogDirs.peek()) } else { // count the number of logs in each parent directory (including 0 for empty directories - val logCounts = allLogs.groupBy(_.parentDir).map { case (parent, logs) => parent -> logs.size } + val logCounts = logsByTopic(topicPartition.topic) + .groupBy(_.parentDir) + .map { case (parent, logs) => parent -> logs.size } val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap val dirCounts = (zeros ++ logCounts).toBuffer