diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 285c033..bf9fa2f 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,7 +19,7 @@ package kafka.admin import joptsimple._ import java.util.Properties -import kafka.common.AdminCommandFailedException +import kafka.common.{Topic, AdminCommandFailedException} import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -143,15 +143,21 @@ object TopicCommand { } topics.foreach { topic => try { - ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) - println("Topic %s is marked for deletion.".format(topic)) - println("Note: This will have no impact if delete.topic.enable is not set to true.") + if (Topic.InternalTopics.contains(topic)) { + throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic)); + } else { + ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) + println("Topic %s is marked for deletion.".format(topic)) + println("Note: This will have no impact if delete.topic.enable is not set to true.") + } } catch { case e: ZkNodeExistsException => println("Topic %s is already marked for deletion.".format(topic)) - case e2: Throwable => + case e: AdminOperationException => + throw e + case e: Throwable => throw new AdminOperationException("Error while deleting topic %s".format(topic)) - } + } } } diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index ac6dd20..c7136f2 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.utils.Logging import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness -import kafka.server.KafkaConfig +import kafka.server.{OffsetManager, KafkaConfig} import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils @@ -60,4 +60,43 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey)) assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal)) } + + @Test + def testTopicDeletion() { + val normalTopic = "test" + + val numPartitionsOriginal = 1 + + // create brokers + val brokers = List(0, 1, 2) + TestUtils.createBrokersInZk(zkClient, brokers) + + // create the NormalTopic + val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, + "--replication-factor", "1", + "--topic", normalTopic)) + TopicCommand.createTopic(zkClient, createOpts) + + // delete the NormalTopic + val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic)) + val deletePath = ZkUtils.getDeleteTopicPath(normalTopic) + assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deletePath)) + TopicCommand.deleteTopic(zkClient, deleteOpts) + assertTrue("Delete path for topic should exist after deletion.", zkClient.exists(deletePath)) + + // create the offset topic + val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, + "--replication-factor", "1", + "--topic", OffsetManager.OffsetsTopicName)) + TopicCommand.createTopic(zkClient, createOffsetTopicOpts) + + // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't + val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", OffsetManager.OffsetsTopicName)) + val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(OffsetManager.OffsetsTopicName) + assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath)) + intercept[AdminOperationException] { + TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts) + } + assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath)) + } } \ No newline at end of file