From 6f9571afaab353d30c41554dc3948224b90e051a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E6=B5=B7=E6=B4=8B?= Date: Thu, 29 Nov 2018 19:51:52 +0800 Subject: [PATCH] modify select disk strategy when create topic, first count by topic s partition count in each directory and then count by total partitions in each directory; first consider is topic level balance, because topic flow is not equal , such huge flow and small flow --- core/src/main/scala/kafka/log/LogManager.scala | 68 +++++++++++++++++++++----- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 508dcd0e6..aac2cdfc7 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -689,7 +689,7 @@ class LogManager(logDirs: Seq[File], if (preferredLogDir != null) preferredLogDir else - nextLogDir().getAbsolutePath + nextLogDir(topicPartition.topic).getAbsolutePath } if (!isLogDirOnline(logDir)) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline") @@ -866,23 +866,65 @@ class LogManager(logDirs: Seq[File], } /** - * Choose the next directory in which to create a log. Currently this is done - * by calculating the number of partitions in each directory and then choosing the - * data directory with the fewest partitions. - */ - private def nextLogDir(): File = { + * Choose the next directory in which to create a log. + * first it is done by calculating the number of topic partitions in each directory + * second it is done by calculating the number of partitions in each directory and then choosing the + * data directory with the fewest partitions. + */ + private def nextLogDir(topic: String): File = { if(_liveLogDirs.size == 1) { _liveLogDirs.peek() } else { - // count the number of logs in each parent directory (including 0 for empty directories - val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) - val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap - val dirCounts = (zeros ++ logCounts).toBuffer + // first count the number of logs in each parent directory (including 0 for empty directories + // second count the number of topic logs in each parent directory + nextLogDirByTopic(topic) + } + } - // choose the directory with the least logs in it - val leastLoaded = dirCounts.sortBy(_._2).head - new File(leastLoaded._1) + /** + * calculate the target directory which has fewer target topic's tp, + * if equals, choose fewest partitions directory + * @param topic + * @return target dir file + */ + private def nextLogDirByTopic(topic: String): File = { + val logCountByTopic = mutable.Map[String, Int]() + allLogs.map { log => + if (topic.equals(log.topicPartition.topic)) { + if (logCountByTopic.contains(log.dir.getParent)) { + logCountByTopic.put(log.dir.getParent, logCountByTopic(log.dir.getParent) + 1) + } else { + logCountByTopic.put(log.dir.getParent, 1) + } + } else { + if (!logCountByTopic.contains(log.dir.getParent)) { + logCountByTopic.put(log.dir.getParent, 0) + } + } + } + + val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) + val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap + val dirCountsByTotalLog = (zeros ++ logCounts).toBuffer + + // dirCounts type (dir, (countByTopic, totalCount)) + val dirCounts = dirCountsByTotalLog.map { dir => + (dir._1, (logCountByTopic.getOrElse(dir._1, 0), dir._2)) } + + val sortedDirs = dirCounts.sortWith((dir1, dir2) => { + val tuple1 = dir1._2 + val tuple2 = dir2._2 + //first sorted by topic count in dir + if (tuple1._1 < tuple2._1) true + else if (tuple1._1 > tuple2._1) false + else { + // second sorted by total tps in dir + tuple1._2 < tuple2._2 + } + }) + + new File(sortedDirs.head._1) } /** -- 2.15.1 (Apple Git-101)