Index: core/src/main/scala/kafka/log/LogManager.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/log/LogManager.scala (revision 81900d0ba0a5839a1e5dc876897bab1c24b3bd94) +++ core/src/main/scala/kafka/log/LogManager.scala (revision baf673943cceba705350a81117de6f471c308de8) @@ -690,7 +690,7 @@ if (preferredLogDir != null) List(new File(preferredLogDir)) else - nextLogDirs() + nextLogDirs(topicPartition) } val logDirName = { @@ -884,24 +884,56 @@ removedLog } + case class AllLogCount(topicCount: Int, dirCount: Int) + + private def nextLogDirByTopic(topicPartition: TopicPartition): List[File] = { + val logCountByTopic = mutable.Map[String, Int]() + allLogs.map { log => + if (topicPartition.topic.equals(log.topicPartition.topic)) { + if (logCountByTopic.contains(log.dir.getParent)) { + logCountByTopic.put(log.dir.getParent, logCountByTopic(log.dir.getParent) + 1) + } else { + logCountByTopic.put(log.dir.getParent, 1) + } + } else { + if (!logCountByTopic.contains(log.dir.getParent)) { + logCountByTopic.put(log.dir.getParent, 0) + } + } + } + + val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) + val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap + val dirCounts = (zeros ++ logCounts).toBuffer + + val allLogCounts = dirCounts.map { dir => + (dir._1, new AllLogCount(logCountByTopic.getOrElse(dir._1, 0), dir._2)) + } + + val leastLoaded = allLogCounts.toSeq.sortWith((dir1, dir2) => { + if (dir1._2.topicCount < dir2._2.topicCount) true + else if (dir1._2.topicCount > dir2._2.topicCount) false + else { + dir1._2.dirCount < dir2._2.dirCount + } + }) + + leastLoaded.map{dir=> + new File(dir._1) + }.toList + } + + /** * Provides the full ordered list of suggested directories for the next partition. * Currently this is done by calculating the number of partitions in each directory and then sorting the * data directories by fewest partitions. */ - private def nextLogDirs(): List[File] = { + private def nextLogDirs(topicPartition: TopicPartition): List[File] = { if(_liveLogDirs.size == 1) { List(_liveLogDirs.peek()) } else { - // count the number of logs in each parent directory (including 0 for empty directories - val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) - val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap - val dirCounts = (zeros ++ logCounts).toBuffer - - // choose the directory with the least logs in it - dirCounts.sortBy(_._2).map { - case (path: String, _: Int) => new File(path) - }.toList + nextLogDirByTopic(topicPartition) } }