diff --git core/src/main/scala/kafka/log/Log.scala core/src/main/scala/kafka/log/Log.scala index 4fae2f0..9584869 100644 --- core/src/main/scala/kafka/log/Log.scala +++ core/src/main/scala/kafka/log/Log.scala @@ -833,8 +833,20 @@ object Log { * Parse the topic and partition out of the directory name of a log */ def parseTopicPartitionName(name: String): TopicAndPartition = { + if (name == null || name.isEmpty || !name.contains('-')) { + throwException(name) + } val index = name.lastIndexOf('-') - TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) + val topic: String = name.substring(0, index) + val partition: String = name.substring(index + 1) + if (topic.length < 1 || partition.length < 1) { + throwException(name) + } + TopicAndPartition(topic, partition.toInt) + } + + def throwException(name: String) { + throw new KafkaException("Log directory [" + name + "] is not in the form of topic-partition") } } diff --git core/src/test/scala/unit/kafka/log/LogTest.scala core/src/test/scala/unit/kafka/log/LogTest.scala index d670ba7..55aef61 100644 --- core/src/test/scala/unit/kafka/log/LogTest.scala +++ core/src/test/scala/unit/kafka/log/LogTest.scala @@ -688,4 +688,77 @@ class LogTest extends JUnitSuite { assertEquals(recoveryPoint, log.logEndOffset) cleanShutdownFile.delete() } + + @Test + def testParseTopicPartitionName() { + val topic: String = "test_topic" + val partition:String = "143" + val topicAndPartition = Log.parseTopicPartitionName(topicPartitionName(topic, partition)); + assertEquals(topic, topicAndPartition.asTuple._1) + assertEquals(partition.toInt, topicAndPartition.asTuple._2) + } + + @Test + def testParseTopicPartitionNameForEmptyName() { + try { + val dirName: String = "" + val topicAndPartition = Log.parseTopicPartitionName(dirName); + fail("KafkaException should have been thrown for dir: " + dirName) + } catch { + case e: Exception => // its GOOD! + } + } + + @Test + def testParseTopicPartitionNameForNull() { + try { + val dirName: String = null + val topicAndPartition = Log.parseTopicPartitionName(dirName); + fail("KafkaException should have been thrown for dir: " + dirName) + } catch { + case e: Exception => // its GOOD! + } + } + + @Test + def testParseTopicPartitionNameForMissingSeparator() { + val topic: String = "test_topic" + val partition:String = "1999" + try { + val topicAndPartition = Log.parseTopicPartitionName(topic + partition); + fail("KafkaException should have been thrown for dir: " + topic + partition) + } catch { + case e: Exception => // its GOOD! + } + } + + @Test + def testParseTopicPartitionNameForMissingTopic() { + val topic: String = "" + val partition:String = "1999" + try { + val name: String = topicPartitionName(topic, partition) + val topicAndPartition = Log.parseTopicPartitionName(name); + fail("KafkaException should have been thrown for dir: " + name) + } catch { + case e: Exception => // its GOOD! + } + } + + @Test + def testParseTopicPartitionNameForMissingPartition() { + val topic: String = "test_topic" + val partition:String = "" + try { + val name: String = topicPartitionName(topic, partition) + val topicAndPartition = Log.parseTopicPartitionName(name); + fail("KafkaException should have been thrown for dir: " + name) + } catch { + case e: Exception => // its GOOD! + } + } + + def topicPartitionName(topic: String, partition: String): String = { + topic + "-" + partition + } }