diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 46237c7dbc5fb8318db772434462003d5b8bbccd..3fe725c0302fba40efd1051de163cc04c6ba2c5a 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -111,6 +111,14 @@ private[kafka] class LogManager(val config: KafkaConfig, * Create a log for the given topic and the given partition */ private def createLog(topic: String, partition: Int): Log = { + if (topic.length <= 0) + throw new InvalidTopicException("topic name can't be emtpy") + if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) { + val error = "Wrong partition %d, valid partitions (0, %d)." + .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1)) + warn(error) + throw new InvalidPartitionException(error) + } logCreationLock synchronized { val d = new File(logDir, topic + "-" + partition) d.mkdirs()