diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index a821d60..1b3c04e 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -76,7 +76,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val servers = createTestTopicAndCluster(topic) val controllerId = ZkUtils.getController(zkClient) val controller = servers.filter(s => s.config.brokerId == controllerId).head - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && s.config.brokerId != controllerId).last follower.shutdown() @@ -97,62 +97,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { } @Test - def testRequestHandlingDuringDeleteTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // shut down one follower replica - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last - follower.shutdown() - // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic - val props1 = new Properties() - props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) - props1.put("serializer.class", "kafka.serializer.StringEncoder") - props1.put("request.required.acks", "1") - val producerConfig1 = new ProducerConfig(props1) - val producer1 = new Producer[String, String](producerConfig1) - try { - producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) - fail("Test should fail because the topic is being deleted") - } catch { - case e: FailedToSendMessageException => - case oe: Throwable => fail("fails with exception", oe) - } finally { - producer1.close() - } - // test if fetch requests fail during delete topic - val availableServers: Seq[KafkaServer] = servers.filter(s => s.config.brokerId != follower.config.brokerId).toSeq - availableServers.foreach { - server => - val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "") - val request = new FetchRequestBuilder() - .clientId("test-client") - .addFetch(topic, 0, 0, 10000) - .build() - val fetched = consumer.fetch(request) - val fetchResponse = fetched.data(topicAndPartition) - assertEquals("Fetch should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.error) - } - // test if offset requests fail during delete topic - availableServers.foreach { - server => - val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "") - val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val offsetResponse = consumer.getOffsetsBefore(offsetRequest) - val errorCode = offsetResponse.partitionErrorAndOffsets(topicAndPartition).error - assertEquals("Offset request should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, errorCode) - } - // restart follower replica - follower.startup() - verifyTopicDeletion(topic, availableServers) - servers.foreach(_.shutdown()) - } - - @Test def testPartitionReassignmentDuringDeleteTopic() { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" @@ -168,7 +112,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last follower.shutdown() @@ -202,7 +146,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { def testDeleteTopicDuringAddPartition() { val topic = "test" val servers = createTestTopicAndCluster(topic) - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last val newPartition = TopicAndPartition(topic, 1) @@ -334,8 +278,11 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted") TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted") + // ensure that the topic-partition has been deleted from all brokers' replica managers + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.replicaManager.getPartition(topic, 0) == None), + "Replica manager's should have deleted all of this topic's partitions") // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) } -} \ No newline at end of file +}