diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 36ddeb4..b5d8714 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -87,7 +87,17 @@ object AdminUtils extends Logging { ret.toMap } - def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") { + + /** + * Add partitions to existing topic with optional replica assignment + * + * @param zkClient Zookeeper client + * @param topic Topic for adding partitions to + * @param numPartitions Number of partitions to be set + * @param replicaAssignmentStr Manual replica assignment + * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing + */ + def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", checkBrokerAvailable: Boolean = true) { val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) if (existingPartitionsReplicaList.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) @@ -102,7 +112,7 @@ object AdminUtils extends Logging { val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size) else - getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) + getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable) // check if manual assignment has the right replication factor val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size)) @@ -117,7 +127,7 @@ object AdminUtils extends Logging { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true) } - def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = { + def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = { var partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() var partitionId = startPartitionId @@ -128,7 +138,7 @@ object AdminUtils extends Logging { throw new AdminOperationException("replication factor must be larger than 0") if (brokerList.size != brokerList.toSet.size) throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList) - if (!brokerList.toSet.subsetOf(availableBrokerList)) + if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList)) throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString + "available broker:" + availableBrokerList.toString) ret.put(partitionId, brokerList.toList) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 919aeb2..8763968 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -206,13 +206,17 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) { - brokerIds.filter(b => b >= 0).foreach { brokerId => - leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo]) - leaderAndIsrRequestMap(brokerId).put((topic, partition), - PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) + val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partition) + + brokerIds.filter(b => b >= 0).foreach { + brokerId => + leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo]) + leaderAndIsrRequestMap(brokerId).put((topic, partition), + PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) } + addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, - Set(TopicAndPartition(topic, partition))) + Set(topicAndPartition)) } def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean, diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 933de9d..401bf1e 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -337,11 +337,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * required to clean up internal controller data structures */ def onControllerResignation() { + if (deleteTopicManager != null) + deleteTopicManager.shutdown() + inLock(controllerContext.controllerLock) { if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() - if (deleteTopicManager != null) - deleteTopicManager.shutdown() + Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() replicaStateMachine.shutdown() @@ -644,8 +646,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def shutdown() = { inLock(controllerContext.controllerLock) { isRunning = false - onControllerResignation() } + onControllerResignation() } def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = { diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 0e47dac..2f0f29d 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -233,8 +233,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case Some(updatedLeaderIsrAndControllerEpoch) => // send the shrunk ISR state change request to all the remaining alive replicas of the partition. val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), - topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) + if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), + topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) + } replicaState.put(partitionAndReplica, OfflineReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) @@ -275,6 +277,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.filter(r => r._1.topic.equals(topic) && r._2 == state).keySet } + def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = { + replicaState.exists(r => r._1.topic.equals(topic) && r._2 == state) + } + def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = { val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible) replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index e4bc243..219c413 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicBoolean class TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted: Set[String] = Set.empty, initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging { + this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], " val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine @@ -81,14 +82,12 @@ class TopicDeletionManager(controller: KafkaController, val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable - @volatile var isShuttingDown = false /** * Invoked at the end of new controller initiation */ def start() { - if(isDeleteTopicEnabled) { - isShuttingDown = false + if (isDeleteTopicEnabled) { deleteTopicsThread = new DeleteTopicsThread() deleteTopicStateChanged.set(true) deleteTopicsThread.start() @@ -96,17 +95,18 @@ class TopicDeletionManager(controller: KafkaController, } /** - * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared + * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared. */ def shutdown() { - if(isDeleteTopicEnabled) { - isShuttingDown = true + // Only allow one shutdown to go through + if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) { + // Resume the topic deletion so it doesn't block on the condition resumeTopicDeletionThread() - deleteTopicsThread.shutdown() + // Await delete topic thread to exit + deleteTopicsThread.awaitShutdown() topicsToBeDeleted.clear() partitionsToBeDeleted.clear() topicsIneligibleForDeletion.clear() - isShuttingDown = false } } @@ -194,6 +194,13 @@ class TopicDeletionManager(controller: KafkaController, false } + def isPartitionToBeDeleted(topicAndPartition: TopicAndPartition) = { + if(isDeleteTopicEnabled) { + partitionsToBeDeleted.contains(topicAndPartition) + } else + false + } + def isTopicQueuedUpForDeletion(topic: String): Boolean = { if(isDeleteTopicEnabled) { topicsToBeDeleted.contains(topic) @@ -207,8 +214,8 @@ class TopicDeletionManager(controller: KafkaController, */ private def awaitTopicDeletionNotification() { inLock(deleteLock) { - while(!isShuttingDown && !deleteTopicStateChanged.compareAndSet(true, false)) { - info("Waiting for signal to start or continue topic deletion") + while(!deleteTopicsThread.isRunning.get() && !deleteTopicStateChanged.compareAndSet(true, false)) { + debug("Waiting for signal to start or continue topic deletion") deleteTopicsCond.await() } } @@ -257,6 +264,8 @@ class TopicDeletionManager(controller: KafkaController, private def markTopicForDeletionRetry(topic: String) { // reset replica states from ReplicaDeletionIneligible to OfflineReplica val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) + info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" + .format(topic, failedReplicas.mkString(","))) controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica) } @@ -281,7 +290,10 @@ class TopicDeletionManager(controller: KafkaController, /** * This callback is invoked by the DeleteTopics thread with the list of topics to be deleted - * It invokes the delete partition callback for all partitions of a topic + * It invokes the delete partition callback for all partitions of a topic. + * The updateMetadataRequest is also going to set the leader for the topics being deleted to + * {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be + * removed from their caches. */ private def onTopicDeletion(topics: Set[String]) { info("Topic deletion callback for %s".format(topics.mkString(","))) @@ -305,10 +317,9 @@ class TopicDeletionManager(controller: KafkaController, * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends * the replicas a StopReplicaRequest (delete=true) * This callback does the following things - - * 1. Send metadata request to all brokers excluding the topics to be deleted - * 2. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible + * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible * for deletion if some replicas are dead since it won't complete successfully anyway - * 3. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully + * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully *@param replicasForTopicsToBeDeleted */ private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) { @@ -324,8 +335,10 @@ class TopicDeletionManager(controller: KafkaController, debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(","))) controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted, new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build) - if(deadReplicasForTopic.size > 0) + if(deadReplicasForTopic.size > 0) { + debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic)) markTopicIneligibleForDeletion(Set(topic)) + } } } @@ -365,12 +378,12 @@ class TopicDeletionManager(controller: KafkaController, } } - class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) { + class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) { val zkClient = controllerContext.zkClient override def doWork() { awaitTopicDeletionNotification() - if(!isRunning.get) + if (!isRunning.get) return inLock(controllerContext.controllerLock) { @@ -395,13 +408,12 @@ class TopicDeletionManager(controller: KafkaController, partitions.mkString(","), topic)) } else { // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in - // TopicDeletionSuccessful. That means, there is at least one failed replica, which means topic deletion - // should be retried - val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) - // mark topic for deletion retry - markTopicForDeletionRetry(topic) - info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" - .format(topic, replicasInTopicDeletionFailedState.mkString(","))) + // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion + // or there is at least one failed replica (which means topic deletion should be retried). + if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { + // mark topic for deletion retry + markTopicForDeletionRetry(topic) + } } } // Try delete topic if it is eligible for deletion. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index ac67f08..ab72cff 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -44,7 +44,6 @@ class LogManager(val logDirs: Array[File], val retentionCheckMs: Long, scheduler: Scheduler, private val time: Time) extends Logging { - val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 11c20ce..6a56a77 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -125,7 +125,16 @@ class ReplicaManager(val config: KafkaConfig, if (removedPartition != null) removedPartition.delete() // this will delete the local log } - case None => //do nothing if replica no longer exists. This can happen during delete topic retries + case None => + // Delete log and corresponding folders in case replica manager doesn't hold them anymore. + // This could happen when topic is being deleted while broker is down and recovers. + if(deletePartition) { + val topicAndPartition = TopicAndPartition(topic, partitionId) + + if(logManager.getLog(topicAndPartition).isDefined) { + logManager.deleteLog(topicAndPartition) + } + } stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker" .format(localBrokerId, deletePartition, topic, partitionId)) } diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala index cf8adc9..fc226c8 100644 --- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala +++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala @@ -27,20 +27,29 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean val isRunning: AtomicBoolean = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) + def shutdown() = { + initiateShutdown() + awaitShutdown() + } - def shutdown(): Unit = { - info("Shutting down") - isRunning.set(false) - if (isInterruptible) - interrupt() - shutdownLatch.await() - info("Shutdown completed") + def initiateShutdown(): Boolean = { + if(isRunning.compareAndSet(true, false)) { + info("Shutting down") + isRunning.set(false) + if (isInterruptible) + interrupt() + true + } else + false } /** - * After calling shutdown(), use this API to wait until the shutdown is complete + * After calling initiateShutdown(), use this API to wait until the shutdown is complete */ - def awaitShutdown(): Unit = shutdownLatch.await() + def awaitShutdown(): Unit = { + shutdownLatch.await() + info("Shutdown completed") + } def doWork(): Unit diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 9c29e14..241572c 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -33,376 +33,286 @@ import kafka.api.PartitionOffsetRequestInfo class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { - /* Temporarily disable all tests until delete topic is fixed. - * Add a fake test to let junit tests pass. - */ @Test - def testFake() { + def testDeleteTopicWithAllAliveReplicas() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + servers.foreach(_.shutdown()) } + @Test + def testResumeDeleteTopicWithRecoveredFollower() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // shut down one follower replica + val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + follower.shutdown() + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // check if all replicas but the one that is shut down has deleted the log + TestUtils.waitUntilTrue(() => + servers.filter(s => s.config.brokerId != follower.config.brokerId) + .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.") + // ensure topic deletion is halted + TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path deleted even when a follower replica is down") + // restart follower replica + follower.startup() + verifyTopicDeletion(topic, servers) + servers.foreach(_.shutdown()) + } - /* - @Test - def testDeleteTopicWithAllAliveReplicas() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) - servers.foreach(_.shutdown()) - } + @Test + def testResumeDeleteTopicOnControllerFailover() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + val controllerId = ZkUtils.getController(zkClient) + val controller = servers.filter(s => s.config.brokerId == controllerId).head + var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + follower.shutdown() - @Test - def testResumeDeleteTopicWithRecoveredFollower() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // shut down one follower replica - val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last - follower.shutdown() - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // check if all replicas but the one that is shut down has deleted the log - assertTrue("Replicas 0,1 have not deleted log in 1000ms", TestUtils.waitUntilTrue(() => - servers.filter(s => s.config.brokerId != follower.config.brokerId) - .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), 1000)) - // ensure topic deletion is halted - assertTrue("Admin path /admin/delete_topic/test path deleted in 1000ms even when a follower replica is down", - TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 500)) - // restart follower replica - follower.startup() - verifyTopicDeletion(topic, servers) - servers.foreach(_.shutdown()) - } + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // shut down the controller to trigger controller failover during delete topic + controller.shutdown() - @Test - def testResumeDeleteTopicOnControllerFailover() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // shut down the controller to trigger controller failover during delete topic - val controllerId = ZkUtils.getController(zkClient) - val controller = servers.filter(s => s.config.brokerId == controllerId).head - controller.shutdown() - // ensure topic deletion is halted - assertTrue("Admin path /admin/delete_topic/test path deleted in 500ms even when a replica is down", - TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 500)) - // restart follower replica - controller.startup() - // wait until admin path for delete topic is deleted, signaling completion of topic deletion - assertTrue("Admin path /admin/delete_topic/test path not deleted in 4000ms even after a follower replica is restarted", - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 4000)) - assertTrue("Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted", - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), 100)) - // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper - assertTrue("Replica logs not deleted after delete topic is complete", - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) - servers.foreach(_.shutdown()) - } + // ensure topic deletion is halted + TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path deleted even when a replica is down") - @Test - def testRequestHandlingDuringDeleteTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // shut down one follower replica - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last - follower.shutdown() - // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic - val props1 = new Properties() - props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) - props1.put("serializer.class", "kafka.serializer.StringEncoder") - props1.put("request.required.acks", "1") - val producerConfig1 = new ProducerConfig(props1) - val producer1 = new Producer[String, String](producerConfig1) - try{ - producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) - fail("Test should fail because the topic is being deleted") - } catch { - case e: FailedToSendMessageException => - case oe: Throwable => fail("fails with exception", oe) - } finally { - producer1.close() - } - // test if fetch requests fail during delete topic - servers.filter(s => s.config.brokerId != follower.config.brokerId).foreach { server => - val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64*1024, "") + controller.startup() + follower.startup() + + verifyTopicDeletion(topic, servers) + servers.foreach(_.shutdown()) + } + + @Test + def testRequestHandlingDuringDeleteTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // shut down one follower replica + var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + follower.shutdown() + // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic + val props1 = new Properties() + props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) + props1.put("serializer.class", "kafka.serializer.StringEncoder") + props1.put("request.required.acks", "1") + val producerConfig1 = new ProducerConfig(props1) + val producer1 = new Producer[String, String](producerConfig1) + try { + producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) + fail("Test should fail because the topic is being deleted") + } catch { + case e: FailedToSendMessageException => + case oe: Throwable => fail("fails with exception", oe) + } finally { + producer1.close() + } + // test if fetch requests fail during delete topic + val availableServers: Seq[KafkaServer] = servers.filter(s => s.config.brokerId != follower.config.brokerId).toSeq + availableServers.foreach { + server => + val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "") val request = new FetchRequestBuilder() .clientId("test-client") .addFetch(topic, 0, 0, 10000) .build() val fetched = consumer.fetch(request) val fetchResponse = fetched.data(topicAndPartition) - assertTrue("Fetch should fail with UnknownTopicOrPartitionCode", fetchResponse.error == ErrorMapping.UnknownTopicOrPartitionCode) - } - // test if offset requests fail during delete topic - servers.filter(s => s.config.brokerId != follower.config.brokerId).foreach { server => - val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64*1024, "") + assertEquals("Fetch should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.error) + } + // test if offset requests fail during delete topic + availableServers.foreach { + server => + val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "") val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val offsetResponse = consumer.getOffsetsBefore(offsetRequest) val errorCode = offsetResponse.partitionErrorAndOffsets(topicAndPartition).error - assertTrue("Offset request should fail with UnknownTopicOrPartitionCode", errorCode == ErrorMapping.UnknownTopicOrPartitionCode) - } - // restart follower replica - follower.startup() - verifyTopicDeletion(topic, servers) - servers.foreach(_.shutdown()) - } - - @Test - def testDeleteTopicDuringPreferredReplicaElection() { - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val servers = createTestTopicAndCluster(topic) - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - // shut down the controller to move the leader to a non preferred replica before delete topic - val preferredReplicaId = 0 - val preferredReplica = servers.filter(s => s.config.brokerId == preferredReplicaId).head - preferredReplica.shutdown() - preferredReplica.startup() - val newLeaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 3000, leaderIdOpt) - assertTrue("New leader should be elected prior to delete topic", newLeaderIdOpt.isDefined) - // test preferred replica election - val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(topicAndPartition)) - preferredReplicaElection.moveLeaderToPreferredReplica() - // start topic deletion during preferred replica election. This should halt topic deletion but eventually - // complete it successfully - AdminUtils.deleteTopic(zkClient, topic) - val newControllerId = ZkUtils.getController(zkClient) - val newController = servers.filter(s => s.config.brokerId == newControllerId).head - assertTrue("Preferred replica election should succeed after 1000ms", TestUtils.waitUntilTrue(() => - !newController.kafkaController.controllerContext.partitionsUndergoingPreferredReplicaElection.contains(topicAndPartition), 1000)) - verifyTopicDeletion(topic, servers) - servers.foreach(_.shutdown()) - } - - @Test - def testPartitionReassignmentDuringDeleteTopic() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) - // create brokers - val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) - val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) - // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - // wait until replica log is created on every broker - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since - // the topic is being deleted - // reassign partition 0 - val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - val newReplicas = Seq(1, 2, 3) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) - assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions()) - // wait until reassignment is completed - TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed; - }, 1000) - val controllerId = ZkUtils.getController(zkClient) - val controller = servers.filter(s => s.config.brokerId == controllerId).head - assertFalse("Partition reassignment should fail", - controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) - verifyTopicDeletion(topic, servers) - allServers.foreach(_.shutdown()) - } - - @Test - def testDeleteTopicDuringPartitionReassignment() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) - // create brokers - val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) - val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) - // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - // wait until replica log is created on every broker - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - // start partition reassignment at the same time right before delete topic. In this case, reassignment will succeed - // reassign partition 0 - val newReplicas = Seq(1, 2, 3) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) - assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // wait until reassignment is completed - TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 1000) - val controllerId = ZkUtils.getController(zkClient) - val controller = servers.filter(s => s.config.brokerId == controllerId).head - assertFalse("Partition reassignment should complete", - controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - assertEquals("Partition should be reassigned to 1,2,3", newReplicas, assignedReplicas) - verifyTopicDeletion(topic, allServers) - allServers.foreach(_.shutdown()) + assertEquals("Offset request should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, errorCode) } + // restart follower replica + follower.startup() + verifyTopicDeletion(topic, availableServers) + servers.foreach(_.shutdown()) + } - @Test - def testDeleteTopicDuringAddPartition() { - val topic = "test" - val servers = createTestTopicAndCluster(topic) - val newPartition = TopicAndPartition(topic, 1) - // add partitions to topic - AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // test if topic deletion is resumed - verifyTopicDeletion(topic, servers) - // verify that new partition doesn't exist on any broker either - assertTrue("Replica logs not for new partition [test,1] not deleted after delete topic is complete", TestUtils.waitUntilTrue(() => - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), 1000)) - servers.foreach(_.shutdown()) - } + @Test + def testPartitionReassignmentDuringDeleteTopic() { + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val topic = "test" + val topicAndPartition = TopicAndPartition(topic, 0) + val brokerConfigs = TestUtils.createBrokerConfigs(4) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) + // create the topic + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + res && server.getLogManager().getLog(topicAndPartition).isDefined), + "Replicas for topic test not created.") + var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + follower.shutdown() + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since + // the topic is being deleted + // reassign partition 0 + val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + val newReplicas = Seq(1, 2, 3) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions()) + // wait until reassignment is completed + TestUtils.waitUntilTrue(() => { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed; + }, "Partition reassignment shouldn't complete.") + val controllerId = ZkUtils.getController(zkClient) + val controller = servers.filter(s => s.config.brokerId == controllerId).head + assertFalse("Partition reassignment should fail", + controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) + follower.startup() + verifyTopicDeletion(topic, servers) + allServers.foreach(_.shutdown()) + } - @Test - def testAddPartitionDuringDeleteTopic() { - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // add partitions to topic - val newPartition = TopicAndPartition(topic, 1) - AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") - verifyTopicDeletion(topic, servers) - // verify that new partition doesn't exist on any broker either - assertTrue("Replica logs not deleted after delete topic is complete", - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) - servers.foreach(_.shutdown()) - } + @Test + def testDeleteTopicDuringAddPartition() { + val topic = "test" + val servers = createTestTopicAndCluster(topic) + var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + val newPartition = TopicAndPartition(topic, 1) + follower.shutdown() + // add partitions to topic + AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2", false) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + follower.startup() + // test if topic deletion is resumed + verifyTopicDeletion(topic, servers) + // verify that new partition doesn't exist on any broker either + TestUtils.waitUntilTrue(() => + servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), + "Replica logs not for new partition [test,1] not deleted after delete topic is complete.") + servers.foreach(_.shutdown()) + } - @Test - def testRecreateTopicAfterDeletion() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) - // re-create topic on same replicas - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - // wait until leader is elected - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) - assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) - // check if all replica logs are created - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) - servers.foreach(_.shutdown()) - } + @Test + def testAddPartitionDuringDeleteTopic() { + val topic = "test" + val topicAndPartition = TopicAndPartition(topic, 0) + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // add partitions to topic + val newPartition = TopicAndPartition(topic, 1) + AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") + verifyTopicDeletion(topic, servers) + // verify that new partition doesn't exist on any broker either + assertTrue("Replica logs not deleted after delete topic is complete", + servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) + servers.foreach(_.shutdown()) + } - @Test - def testTopicConfigChangesDuringDeleteTopic() { - val topic = "test" - val servers = createTestTopicAndCluster(topic) - val topicConfigs = new Properties() - topicConfigs.put("segment.ms", "1000000") - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) - // make topic config changes - try { - AdminUtils.changeTopicConfig(zkClient, topic, topicConfigs) - fail("Should fail with AdminOperationException for topic doesn't exist") - } catch { - case e: AdminOperationException => // expected - } - servers.foreach(_.shutdown()) - } + @Test + def testRecreateTopicAfterDeletion() { + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val topic = "test" + val topicAndPartition = TopicAndPartition(topic, 0) + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + // re-create topic on same replicas + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + // wait until leader is elected + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) + // check if all replica logs are created + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), + "Replicas for topic test not created.") + servers.foreach(_.shutdown()) + } - @Test - def testAutoCreateAfterDeleteTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) - // test if first produce request after topic deletion auto creates the topic - val props = new Properties() - props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("producer.type", "sync") - props.put("request.required.acks", "1") - props.put("message.send.max.retries", "1") - val producerConfig = new ProducerConfig(props) - val producer = new Producer[String, String](producerConfig) - try{ - producer.send(new KeyedMessage[String, String](topic, "test", "test1")) - } catch { - case e: FailedToSendMessageException => fail("Topic should have been auto created") - case oe: Throwable => fail("fails with exception", oe) - } - // test the topic path exists - assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) - // wait until leader is elected - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) - assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) - try { - producer.send(new KeyedMessage[String, String](topic, "test", "test1")) - } catch { - case e: FailedToSendMessageException => fail("Topic should have been auto created") - case oe: Throwable => fail("fails with exception", oe) - } finally { - producer.close() - } - servers.foreach(_.shutdown()) + @Test + def testAutoCreateAfterDeleteTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + // test if first produce request after topic deletion auto creates the topic + val props = new Properties() + props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("producer.type", "sync") + props.put("request.required.acks", "1") + props.put("message.send.max.retries", "1") + val producerConfig = new ProducerConfig(props) + val producer = new Producer[String, String](producerConfig) + try { + producer.send(new KeyedMessage[String, String](topic, "test", "test1")) + } catch { + case e: FailedToSendMessageException => fail("Topic should have been auto created") + case oe: Throwable => fail("fails with exception", oe) } + // test the topic path exists + assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + // wait until leader is elected + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) + servers.foreach(_.shutdown()) + } - @Test - def testDeleteNonExistingTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, "test2") - // verify delete topic path for test2 is removed from zookeeper - verifyTopicDeletion("test2", servers) - // verify that topic test is untouched - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) - // test the topic path exists - assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) - // topic test should have a leader - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) - assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) - servers.foreach(_.shutdown()) + @Test + def testDeleteNonExistingTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, "test2") + // verify delete topic path for test2 is removed from zookeeper + verifyTopicDeletion("test2", servers) + // verify that topic test is untouched + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + res && server.getLogManager().getLog(topicAndPartition).isDefined), + "Replicas for topic test not created") + // test the topic path exists + assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + // topic test should have a leader + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) + servers.foreach(_.shutdown()) - } + } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) @@ -411,21 +321,21 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until replica log is created on every broker - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + res && server.getLogManager().getLog(topicAndPartition).isDefined), + "Replicas for topic test not created") servers } private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) { val topicAndPartition = TopicAndPartition(topic, 0) // wait until admin path for delete topic is deleted, signaling completion of topic deletion - assertTrue("Admin path /admin/delete_topic/test path not deleted in 1000ms even after a replica is restarted", - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 1000)) - assertTrue("Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted", - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), 100)) + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted") + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), + "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted") // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) } - */ } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 5c48796..6c5c2bc 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -24,7 +24,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.utils._ import kafka.common._ import kafka.log.LogConfig -import kafka.admin.AdminUtils +import kafka.admin.{AdminOperationException, AdminUtils} import org.scalatest.junit.JUnit3Suite class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { @@ -48,4 +48,15 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { } } + @Test + def testConfigChangeOnNonExistingTopic() { + val topic = TestUtils.tempTopic + try { + AdminUtils.changeTopicConfig(zkClient, topic, LogConfig(flushInterval = 10000).toProps) + fail("Should fail with AdminOperationException for topic doesn't exist") + } catch { + case e: AdminOperationException => // expected + } + } + } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 014e964..ab60e9b 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -101,7 +101,6 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { verifyNonDaemonThreadsStatus } - /* Temporarily disable the test until delete topic is fixed. @Test def testCleanShutdownWithDeleteTopicEnabled() { val newProps = TestUtils.createBrokerConfig(0, port) @@ -114,7 +113,6 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { Utils.rm(server.config.logDirs) verifyNonDaemonThreadsStatus } - */ def verifyNonDaemonThreadsStatus() { assertEquals(0, Thread.getAllStackTraces.keySet().toArray diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 00bfba4..034f361 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -87,6 +87,8 @@ object TestUtils extends Logging { f } + def tempTopic(): String = "testTopic" + random.nextInt(1000000) + /** * Create a temporary relative directory */