diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 919aeb2..9c7c584b 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -206,13 +206,18 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) { - brokerIds.filter(b => b >= 0).foreach { brokerId => - leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo]) - leaderAndIsrRequestMap(brokerId).put((topic, partition), - PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) + val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partition) + if (!controller.deleteTopicManager.isTopicAndPartitionDeletionInProgress(topicAndPartition)) { + brokerIds.filter(b => b >= 0).foreach { + brokerId => + leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo]) + leaderAndIsrRequestMap(brokerId).put((topic, partition), + PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) + } } + addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, - Set(TopicAndPartition(topic, partition))) + Set(topicAndPartition)) } def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean, diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 933de9d..401bf1e 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -337,11 +337,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * required to clean up internal controller data structures */ def onControllerResignation() { + if (deleteTopicManager != null) + deleteTopicManager.shutdown() + inLock(controllerContext.controllerLock) { if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() - if (deleteTopicManager != null) - deleteTopicManager.shutdown() + Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() replicaStateMachine.shutdown() @@ -644,8 +646,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def shutdown() = { inLock(controllerContext.controllerLock) { isRunning = false - onControllerResignation() } + onControllerResignation() } def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = { diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 0e47dac..6b4f32e 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -275,6 +275,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.filter(r => r._1.topic.equals(topic) && r._2 == state).keySet } + def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = { + replicaState.exists(r => r._1.topic.equals(topic) && r._2 == state) + } + def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = { val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible) replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index e4bc243..96e14cd 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicBoolean class TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted: Set[String] = Set.empty, initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging { + this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], " val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine @@ -81,14 +82,13 @@ class TopicDeletionManager(controller: KafkaController, val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable - @volatile var isShuttingDown = false + var isShuttingDown = new AtomicBoolean(false) /** * Invoked at the end of new controller initiation */ def start() { - if(isDeleteTopicEnabled) { - isShuttingDown = false + if (isDeleteTopicEnabled) { deleteTopicsThread = new DeleteTopicsThread() deleteTopicStateChanged.set(true) deleteTopicsThread.start() @@ -99,14 +99,13 @@ class TopicDeletionManager(controller: KafkaController, * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared */ def shutdown() { - if(isDeleteTopicEnabled) { - isShuttingDown = true + if (isDeleteTopicEnabled && isShuttingDown.compareAndSet(false, true)) { resumeTopicDeletionThread() deleteTopicsThread.shutdown() topicsToBeDeleted.clear() partitionsToBeDeleted.clear() topicsIneligibleForDeletion.clear() - isShuttingDown = false + isShuttingDown.set(false) } } @@ -194,6 +193,13 @@ class TopicDeletionManager(controller: KafkaController, false } + def isTopicAndPartitionDeletionInProgress(topicAndPartition: TopicAndPartition) = { + if(isDeleteTopicEnabled) { + partitionsToBeDeleted.contains(topicAndPartition) + } else + false + } + def isTopicQueuedUpForDeletion(topic: String): Boolean = { if(isDeleteTopicEnabled) { topicsToBeDeleted.contains(topic) @@ -207,7 +213,7 @@ class TopicDeletionManager(controller: KafkaController, */ private def awaitTopicDeletionNotification() { inLock(deleteLock) { - while(!isShuttingDown && !deleteTopicStateChanged.compareAndSet(true, false)) { + while(!isShuttingDown.get && !deleteTopicStateChanged.compareAndSet(true, false)) { info("Waiting for signal to start or continue topic deletion") deleteTopicsCond.await() } @@ -257,6 +263,8 @@ class TopicDeletionManager(controller: KafkaController, private def markTopicForDeletionRetry(topic: String) { // reset replica states from ReplicaDeletionIneligible to OfflineReplica val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) + info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" + .format(topic, failedReplicas.mkString(","))) controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica) } @@ -324,8 +332,10 @@ class TopicDeletionManager(controller: KafkaController, debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(","))) controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted, new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build) - if(deadReplicasForTopic.size > 0) + if(deadReplicasForTopic.size > 0) { + debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic)) markTopicIneligibleForDeletion(Set(topic)) + } } } @@ -365,12 +375,12 @@ class TopicDeletionManager(controller: KafkaController, } } - class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) { + class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) { val zkClient = controllerContext.zkClient override def doWork() { awaitTopicDeletionNotification() - if(!isRunning.get) + if (isShuttingDown.get || !isRunning.get) return inLock(controllerContext.controllerLock) { @@ -395,13 +405,12 @@ class TopicDeletionManager(controller: KafkaController, partitions.mkString(","), topic)) } else { // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in - // TopicDeletionSuccessful. That means, there is at least one failed replica, which means topic deletion - // should be retried - val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) - // mark topic for deletion retry - markTopicForDeletionRetry(topic) - info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" - .format(topic, replicasInTopicDeletionFailedState.mkString(","))) + // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion + // or there is at least one failed replica (which means topic deletion should be retried). + if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { + // mark topic for deletion retry + markTopicForDeletionRetry(topic) + } } } // Try delete topic if it is eligible for deletion. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index ac67f08..d09959d 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -44,7 +44,6 @@ class LogManager(val logDirs: Array[File], val retentionCheckMs: Long, scheduler: Scheduler, private val time: Time) extends Logging { - val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 @@ -246,6 +245,8 @@ class LogManager(val logDirs: Array[File], Some(log) } + def doesLogExists(topicAndPartition: TopicAndPartition) = logs.contains(topicAndPartition) + /** * Create a log for the given topic and the given partition * If the log already exists, just return a copy of the existing log diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b668f2..a4d494f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -40,6 +40,7 @@ import org.I0Itec.zkclient.ZkClient class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val offsetManager: OffsetManager, + val logManager: LogManager, val zkClient: ZkClient, val brokerId: Int, val config: KafkaConfig, @@ -103,6 +104,15 @@ class KafkaApis(val requestChannel: RequestChannel, // stop serving data to clients for the topic being deleted val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) + if(stopReplicaRequest.deletePartitions) { + // Delete log and corresponding folders in case replica manager doesn't hold them anymore. + // This could happen when topic is being deleted while broker is down and recovers. + stopReplicaRequest.partitions.foreach(tp => { + if(logManager.doesLogExists(tp)) { + logManager.deleteLog(tp) + } + }) + } val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c208f83..605f3b1 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -91,7 +91,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg kafkaController = new KafkaController(config, zkClient) /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, logManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) Mx4jLoader.maybeLoad() diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 9c29e14..4662f58 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -33,376 +33,374 @@ import kafka.api.PartitionOffsetRequestInfo class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { - /* Temporarily disable all tests until delete topic is fixed. - * Add a fake test to let junit tests pass. - */ @Test - def testFake() { + def testDeleteTopicWithAllAliveReplicas() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + servers.foreach(_.shutdown()) } + @Test + def testResumeDeleteTopicWithRecoveredFollower() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // shut down one follower replica + val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + follower.shutdown() + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // check if all replicas but the one that is shut down has deleted the log + TestUtils.waitUntilTrue(() => + servers.filter(s => s.config.brokerId != follower.config.brokerId) + .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log in 1000ms", 1000) + // ensure topic deletion is halted + TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path deleted in 1000ms even when a follower replica is down", 500) + // restart follower replica + follower.startup() + verifyTopicDeletion(topic, servers) + servers.foreach(_.shutdown()) + } - /* - @Test - def testDeleteTopicWithAllAliveReplicas() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) - servers.foreach(_.shutdown()) - } - - @Test - def testResumeDeleteTopicWithRecoveredFollower() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // shut down one follower replica - val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last - follower.shutdown() - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // check if all replicas but the one that is shut down has deleted the log - assertTrue("Replicas 0,1 have not deleted log in 1000ms", TestUtils.waitUntilTrue(() => - servers.filter(s => s.config.brokerId != follower.config.brokerId) - .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), 1000)) - // ensure topic deletion is halted - assertTrue("Admin path /admin/delete_topic/test path deleted in 1000ms even when a follower replica is down", - TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 500)) - // restart follower replica - follower.startup() - verifyTopicDeletion(topic, servers) - servers.foreach(_.shutdown()) - } + @Test + def testResumeDeleteTopicOnControllerFailover() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // shut down the controller to trigger controller failover during delete topic + val controllerId = ZkUtils.getController(zkClient) + val controller = servers.filter(s => s.config.brokerId == controllerId).head + controller.shutdown() + // ensure topic deletion is halted + TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path deleted in 500ms even when a replica is down", 500) + // restart follower replica + controller.startup() + // wait until admin path for delete topic is deleted, signaling completion of topic deletion + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path not deleted in 4000ms even after a follower replica is restarted", 4000) + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), + "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted", 100) + // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper + assertTrue("Replica logs not deleted after delete topic is complete", + servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) + servers.foreach(_.shutdown()) + } - @Test - def testResumeDeleteTopicOnControllerFailover() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // shut down the controller to trigger controller failover during delete topic - val controllerId = ZkUtils.getController(zkClient) - val controller = servers.filter(s => s.config.brokerId == controllerId).head - controller.shutdown() - // ensure topic deletion is halted - assertTrue("Admin path /admin/delete_topic/test path deleted in 500ms even when a replica is down", - TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 500)) - // restart follower replica - controller.startup() - // wait until admin path for delete topic is deleted, signaling completion of topic deletion - assertTrue("Admin path /admin/delete_topic/test path not deleted in 4000ms even after a follower replica is restarted", - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 4000)) - assertTrue("Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted", - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), 100)) - // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper - assertTrue("Replica logs not deleted after delete topic is complete", - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) - servers.foreach(_.shutdown()) + @Test + def testRequestHandlingDuringDeleteTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // shut down one follower replica + var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + follower.shutdown() + // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic + val props1 = new Properties() + props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) + props1.put("serializer.class", "kafka.serializer.StringEncoder") + props1.put("request.required.acks", "1") + val producerConfig1 = new ProducerConfig(props1) + val producer1 = new Producer[String, String](producerConfig1) + try { + producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) + fail("Test should fail because the topic is being deleted") + } catch { + case e: FailedToSendMessageException => + case oe: Throwable => fail("fails with exception", oe) + } finally { + producer1.close() } - - @Test - def testRequestHandlingDuringDeleteTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // shut down one follower replica - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last - follower.shutdown() - // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic - val props1 = new Properties() - props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) - props1.put("serializer.class", "kafka.serializer.StringEncoder") - props1.put("request.required.acks", "1") - val producerConfig1 = new ProducerConfig(props1) - val producer1 = new Producer[String, String](producerConfig1) - try{ - producer1.send(new KeyedMessage[String, String](topic, "test", "test1")) - fail("Test should fail because the topic is being deleted") - } catch { - case e: FailedToSendMessageException => - case oe: Throwable => fail("fails with exception", oe) - } finally { - producer1.close() - } - // test if fetch requests fail during delete topic - servers.filter(s => s.config.brokerId != follower.config.brokerId).foreach { server => - val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64*1024, "") + // test if fetch requests fail during delete topic + val availableServers: Seq[KafkaServer] = servers.filter(s => s.config.brokerId != follower.config.brokerId).toSeq + availableServers.foreach { + server => + val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "") val request = new FetchRequestBuilder() .clientId("test-client") .addFetch(topic, 0, 0, 10000) .build() val fetched = consumer.fetch(request) val fetchResponse = fetched.data(topicAndPartition) - assertTrue("Fetch should fail with UnknownTopicOrPartitionCode", fetchResponse.error == ErrorMapping.UnknownTopicOrPartitionCode) - } - // test if offset requests fail during delete topic - servers.filter(s => s.config.brokerId != follower.config.brokerId).foreach { server => - val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64*1024, "") + assertEquals("Fetch should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.error) + } + // test if offset requests fail during delete topic + availableServers.foreach { + server => + val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "") val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val offsetResponse = consumer.getOffsetsBefore(offsetRequest) val errorCode = offsetResponse.partitionErrorAndOffsets(topicAndPartition).error - assertTrue("Offset request should fail with UnknownTopicOrPartitionCode", errorCode == ErrorMapping.UnknownTopicOrPartitionCode) - } - // restart follower replica - follower.startup() - verifyTopicDeletion(topic, servers) - servers.foreach(_.shutdown()) + assertEquals("Offset request should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, errorCode) } + // restart follower replica + follower.startup() + verifyTopicDeletion(topic, availableServers) + servers.foreach(_.shutdown()) + } - @Test - def testDeleteTopicDuringPreferredReplicaElection() { - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val servers = createTestTopicAndCluster(topic) - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - // shut down the controller to move the leader to a non preferred replica before delete topic - val preferredReplicaId = 0 - val preferredReplica = servers.filter(s => s.config.brokerId == preferredReplicaId).head - preferredReplica.shutdown() - preferredReplica.startup() - val newLeaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 3000, leaderIdOpt) - assertTrue("New leader should be elected prior to delete topic", newLeaderIdOpt.isDefined) - // test preferred replica election - val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(topicAndPartition)) - preferredReplicaElection.moveLeaderToPreferredReplica() - // start topic deletion during preferred replica election. This should halt topic deletion but eventually - // complete it successfully - AdminUtils.deleteTopic(zkClient, topic) - val newControllerId = ZkUtils.getController(zkClient) - val newController = servers.filter(s => s.config.brokerId == newControllerId).head - assertTrue("Preferred replica election should succeed after 1000ms", TestUtils.waitUntilTrue(() => - !newController.kafkaController.controllerContext.partitionsUndergoingPreferredReplicaElection.contains(topicAndPartition), 1000)) - verifyTopicDeletion(topic, servers) - servers.foreach(_.shutdown()) - } + @Test + def testDeleteTopicDuringPreferredReplicaElection() { + val topic = "test" + val topicAndPartition = TopicAndPartition(topic, 0) + val servers = createTestTopicAndCluster(topic) + var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + // shut down the controller to move the leader to a non preferred replica before delete topic + val preferredReplicaId = 0 + val preferredReplica = servers.filter(s => s.config.brokerId == preferredReplicaId).head + preferredReplica.shutdown() + preferredReplica.startup() + val newLeaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 3000, leaderIdOpt) + assertTrue("New leader should be elected prior to delete topic", newLeaderIdOpt.isDefined) + // test preferred replica election + val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(topicAndPartition)) + preferredReplicaElection.moveLeaderToPreferredReplica() + // start topic deletion during preferred replica election. This should halt topic deletion but eventually + // complete it successfully + AdminUtils.deleteTopic(zkClient, topic) + val newControllerId = ZkUtils.getController(zkClient) + val newController = servers.filter(s => s.config.brokerId == newControllerId).head + TestUtils.waitUntilTrue(() => + !newController.kafkaController.controllerContext.partitionsUndergoingPreferredReplicaElection.contains(topicAndPartition), + "Preferred replica election should succeed after 1000ms", 1000) + verifyTopicDeletion(topic, servers) + servers.foreach(_.shutdown()) + } - @Test - def testPartitionReassignmentDuringDeleteTopic() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) - // create brokers - val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) - val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) - // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - // wait until replica log is created on every broker - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since - // the topic is being deleted - // reassign partition 0 - val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - val newReplicas = Seq(1, 2, 3) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) - assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions()) - // wait until reassignment is completed - TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed; - }, 1000) - val controllerId = ZkUtils.getController(zkClient) - val controller = servers.filter(s => s.config.brokerId == controllerId).head - assertFalse("Partition reassignment should fail", - controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) - verifyTopicDeletion(topic, servers) - allServers.foreach(_.shutdown()) - } + @Test + def testPartitionReassignmentDuringDeleteTopic() { + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val topic = "test" + val topicAndPartition = TopicAndPartition(topic, 0) + val brokerConfigs = TestUtils.createBrokerConfigs(4) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) + // create the topic + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + res && server.getLogManager().getLog(topicAndPartition).isDefined), + "Replicas for topic test not created in 1000ms", 1000) + var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since + // the topic is being deleted + // reassign partition 0 + val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + val newReplicas = Seq(1, 2, 3) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions()) + // wait until reassignment is completed + TestUtils.waitUntilTrue(() => { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed; + }, "Partition reassignment didn't finish within 1000 ms", 1000) + val controllerId = ZkUtils.getController(zkClient) + val controller = servers.filter(s => s.config.brokerId == controllerId).head + assertFalse("Partition reassignment should fail", + controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) + verifyTopicDeletion(topic, servers) + allServers.foreach(_.shutdown()) + } - @Test - def testDeleteTopicDuringPartitionReassignment() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) - // create brokers - val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) - val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) - // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - // wait until replica log is created on every broker - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) - var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) - assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) - // start partition reassignment at the same time right before delete topic. In this case, reassignment will succeed - // reassign partition 0 - val newReplicas = Seq(1, 2, 3) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) - assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // wait until reassignment is completed - TestUtils.waitUntilTrue(() => { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); - ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, - Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; - }, 1000) - val controllerId = ZkUtils.getController(zkClient) - val controller = servers.filter(s => s.config.brokerId == controllerId).head - assertFalse("Partition reassignment should complete", - controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) - assertEquals("Partition should be reassigned to 1,2,3", newReplicas, assignedReplicas) - verifyTopicDeletion(topic, allServers) - allServers.foreach(_.shutdown()) - } + @Test + def testDeleteTopicDuringPartitionReassignment() { + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val topic = "test" + val topicAndPartition = TopicAndPartition(topic, 0) + val brokerConfigs = TestUtils.createBrokerConfigs(4) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) + // create the topic + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + res && server.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created in 1000ms", 1000) + var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + // start partition reassignment at the same time right before delete topic. In this case, reassignment will succeed + // reassign partition 0 + val newReplicas = Seq(1, 2, 3) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) + assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // wait until reassignment is completed + TestUtils.waitUntilTrue(() => { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas, + Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, "Partition reassignment wasn't completed after 1000 ms", 1000) + val controllerId = ZkUtils.getController(zkClient) + val controller = servers.filter(s => s.config.brokerId == controllerId).head + assertFalse("Partition reassignment should complete", + controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition)) + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) + assertEquals("Partition should be reassigned to 1,2,3", newReplicas, assignedReplicas) + verifyTopicDeletion(topic, allServers) + allServers.foreach(_.shutdown()) + } - @Test - def testDeleteTopicDuringAddPartition() { - val topic = "test" - val servers = createTestTopicAndCluster(topic) - val newPartition = TopicAndPartition(topic, 1) - // add partitions to topic - AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // test if topic deletion is resumed - verifyTopicDeletion(topic, servers) - // verify that new partition doesn't exist on any broker either - assertTrue("Replica logs not for new partition [test,1] not deleted after delete topic is complete", TestUtils.waitUntilTrue(() => - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), 1000)) - servers.foreach(_.shutdown()) - } + @Test + def testDeleteTopicDuringAddPartition() { + val topic = "test" + val servers = createTestTopicAndCluster(topic) + val newPartition = TopicAndPartition(topic, 1) + // add partitions to topic + AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // test if topic deletion is resumed + verifyTopicDeletion(topic, servers) + // verify that new partition doesn't exist on any broker either + TestUtils.waitUntilTrue(() => + servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), + "Replica logs not for new partition [test,1] not deleted after delete topic is complete", 1000) + servers.foreach(_.shutdown()) + } - @Test - def testAddPartitionDuringDeleteTopic() { - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - // add partitions to topic - val newPartition = TopicAndPartition(topic, 1) - AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") - verifyTopicDeletion(topic, servers) - // verify that new partition doesn't exist on any broker either - assertTrue("Replica logs not deleted after delete topic is complete", - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) - servers.foreach(_.shutdown()) - } + @Test + def testAddPartitionDuringDeleteTopic() { + val topic = "test" + val topicAndPartition = TopicAndPartition(topic, 0) + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + // add partitions to topic + val newPartition = TopicAndPartition(topic, 1) + AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") + verifyTopicDeletion(topic, servers) + // verify that new partition doesn't exist on any broker either + assertTrue("Replica logs not deleted after delete topic is complete", + servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) + servers.foreach(_.shutdown()) + } - @Test - def testRecreateTopicAfterDeletion() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) - val topic = "test" - val topicAndPartition = TopicAndPartition(topic, 0) - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) - // re-create topic on same replicas - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - // wait until leader is elected - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) - assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) - // check if all replica logs are created - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) - servers.foreach(_.shutdown()) - } + @Test + def testRecreateTopicAfterDeletion() { + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val topic = "test" + val topicAndPartition = TopicAndPartition(topic, 0) + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + // re-create topic on same replicas + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + // wait until leader is elected + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) + // check if all replica logs are created + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), + "Replicas for topic test not created in 1000ms", 1000) + servers.foreach(_.shutdown()) + } - @Test - def testTopicConfigChangesDuringDeleteTopic() { - val topic = "test" - val servers = createTestTopicAndCluster(topic) - val topicConfigs = new Properties() - topicConfigs.put("segment.ms", "1000000") - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) - // make topic config changes - try { - AdminUtils.changeTopicConfig(zkClient, topic, topicConfigs) - fail("Should fail with AdminOperationException for topic doesn't exist") - } catch { - case e: AdminOperationException => // expected - } - servers.foreach(_.shutdown()) + @Test + def testTopicConfigChangesDuringDeleteTopic() { + val topic = "test" + val servers = createTestTopicAndCluster(topic) + val topicConfigs = new Properties() + topicConfigs.put("segment.ms", "1000000") + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + // make topic config changes + try { + AdminUtils.changeTopicConfig(zkClient, topic, topicConfigs) + fail("Should fail with AdminOperationException for topic doesn't exist") + } catch { + case e: AdminOperationException => // expected } + servers.foreach(_.shutdown()) + } - @Test - def testAutoCreateAfterDeleteTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) - // test if first produce request after topic deletion auto creates the topic - val props = new Properties() - props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("producer.type", "sync") - props.put("request.required.acks", "1") - props.put("message.send.max.retries", "1") - val producerConfig = new ProducerConfig(props) - val producer = new Producer[String, String](producerConfig) - try{ - producer.send(new KeyedMessage[String, String](topic, "test", "test1")) - } catch { - case e: FailedToSendMessageException => fail("Topic should have been auto created") - case oe: Throwable => fail("fails with exception", oe) - } - // test the topic path exists - assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) - // wait until leader is elected - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) - assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) - try { - producer.send(new KeyedMessage[String, String](topic, "test", "test1")) - } catch { - case e: FailedToSendMessageException => fail("Topic should have been auto created") - case oe: Throwable => fail("fails with exception", oe) - } finally { - producer.close() - } - servers.foreach(_.shutdown()) + @Test + def testAutoCreateAfterDeleteTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + // test if first produce request after topic deletion auto creates the topic + val props = new Properties() + props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("producer.type", "sync") + props.put("request.required.acks", "1") + props.put("message.send.max.retries", "1") + val producerConfig = new ProducerConfig(props) + val producer = new Producer[String, String](producerConfig) + try { + producer.send(new KeyedMessage[String, String](topic, "test", "test1")) + } catch { + case e: FailedToSendMessageException => fail("Topic should have been auto created") + case oe: Throwable => fail("fails with exception", oe) } + // test the topic path exists + assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + // wait until leader is elected + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) + try { + producer.send(new KeyedMessage[String, String](topic, "test", "test1")) + } catch { + case e: FailedToSendMessageException => fail("Topic should have been auto created") + case oe: Throwable => fail("fails with exception", oe) + } finally { + producer.close() + } + servers.foreach(_.shutdown()) + } - @Test - def testDeleteNonExistingTopic() { - val topicAndPartition = TopicAndPartition("test", 0) - val topic = topicAndPartition.topic - val servers = createTestTopicAndCluster(topic) - // start topic deletion - AdminUtils.deleteTopic(zkClient, "test2") - // verify delete topic path for test2 is removed from zookeeper - verifyTopicDeletion("test2", servers) - // verify that topic test is untouched - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) - // test the topic path exists - assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) - // topic test should have a leader - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) - assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) - servers.foreach(_.shutdown()) + @Test + def testDeleteNonExistingTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, "test2") + // verify delete topic path for test2 is removed from zookeeper + verifyTopicDeletion("test2", servers) + // verify that topic test is untouched + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + res && server.getLogManager().getLog(topicAndPartition).isDefined), + "Replicas for topic test not created in 1000ms", 1000) + // test the topic path exists + assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + // topic test should have a leader + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) + servers.foreach(_.shutdown()) - } + } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) @@ -411,21 +409,21 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until replica log is created on every broker - assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + res && server.getLogManager().getLog(topicAndPartition).isDefined), + "Replicas for topic test not created in 1000ms", 1000) servers } private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) { val topicAndPartition = TopicAndPartition(topic, 0) // wait until admin path for delete topic is deleted, signaling completion of topic deletion - assertTrue("Admin path /admin/delete_topic/test path not deleted in 1000ms even after a replica is restarted", - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 1000)) - assertTrue("Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted", - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), 100)) + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path not deleted in 1000ms even after a replica is restarted", 1000) + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), + "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted", 100) // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) } - */ } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index b1c4ce9..b9a6924 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -93,7 +93,7 @@ class SimpleFetchTest extends JUnit3Suite { // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) // don't provide replica or leader callbacks since they will not be tested here val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) + val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, logManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) @@ -167,7 +167,7 @@ class SimpleFetchTest extends JUnit3Suite { val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) + val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, logManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo)