diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index db8eec3378cbb7d0faca708798971f77efbb123a..2e90d980d1fe4d0316bec1639c57d3753ad9b794 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -143,6 +143,14 @@ private[kafka] class LogManager(val config: KafkaConfig, * Create a log for the given topic and the given partition */ private def createLog(topic: String, partition: Int): Log = { + if (topic.length <= 0) + throw new InvalidTopicException("topic name can't be emtpy") + if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) { + val error = "Wrong partition %d, valid partitions (0, %d)." + .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1)) + warn(error) + throw new InvalidPartitionException(error) + } logCreationLock synchronized { val d = new File(logDir, topic + "-" + partition) d.mkdirs()