From da418ea3f19843ce3264a93640e3f7639999268b Mon Sep 17 00:00:00 2001 From: tmalaska Date: Sun, 22 Feb 2015 16:14:56 -0500 Subject: [PATCH] KAFKA-1961-update 6 --- core/src/main/scala/kafka/admin/TopicCommand.scala | 12 ++++-- .../scala/unit/kafka/admin/TopicCommandTest.scala | 46 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 285c033..8f3511f 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,7 +19,7 @@ package kafka.admin import joptsimple._ import java.util.Properties -import kafka.common.AdminCommandFailedException +import kafka.common.{Topic, AdminCommandFailedException} import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -143,9 +143,13 @@ object TopicCommand { } topics.foreach { topic => try { - ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) - println("Topic %s is marked for deletion.".format(topic)) - println("Note: This will have no impact if delete.topic.enable is not set to true.") + if (Topic.InternalTopics.contains(topic)) { + throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic)); + } else { + ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) + println("Topic %s is marked for deletion.".format(topic)) + println("Note: This will have no impact if delete.topic.enable is not set to true.") + } } catch { case e: ZkNodeExistsException => println("Topic %s is already marked for deletion.".format(topic)) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index ac6dd20..78dd4ef 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.utils.Logging import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness -import kafka.server.KafkaConfig +import kafka.server.{OffsetManager, KafkaConfig} import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils @@ -60,4 +60,48 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey)) assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal)) } + + @Test + def testTopicDeletion() { + val normalTopic = "test" + + val numPartitionsOriginal = 1 + + // create brokers + val brokers = List(0, 1, 2) + TestUtils.createBrokersInZk(zkClient, brokers) + + // create the NormalTopic + val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, + "--replication-factor", "1", + "--topic", normalTopic)) + TopicCommand.createTopic(zkClient, createOpts) + + // delete the NormalTopic + val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic)) + val deletePath = ZkUtils.getDeleteTopicPath(normalTopic) + assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deletePath)) + TopicCommand.deleteTopic(zkClient, deleteOpts) + assertTrue("Delete path for topic should exist after deletion.", zkClient.exists(deletePath)) + + // create the offset topic + val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, + "--replication-factor", "1", + "--topic", OffsetManager.OffsetsTopicName)) + TopicCommand.createTopic(zkClient, createOffsetTopicOpts) + + // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't + val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", OffsetManager.OffsetsTopicName)) + val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(OffsetManager.OffsetsTopicName) + assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath)) + var exceptionThrown = false + try { + TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts) + fail("Delete of offset topic should had thrown an exception") + } catch { + case e: AdminOperationException => {exceptionThrown = true} + } + assertTrue("AdminOperationException should had been thrown.", exceptionThrown) + assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath)) + } } \ No newline at end of file -- 1.9.3 (Apple Git-50)