diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7dc2718..c8a56ee 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -37,9 +37,9 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import org.apache.log4j.Logger +import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition -import java.util.concurrent.locks.ReentrantLock class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -643,15 +643,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def shutdown() = { inLock(controllerContext.controllerLock) { isRunning = false - partitionStateMachine.shutdown() - replicaStateMachine.shutdown() - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - if(controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - } - info("Controller shutdown complete") + onControllerResignation() } } diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 488dfd0..43313ff 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} +import java.util.concurrent.locks.ReentrantLock /** * This manages the state machine for topic deletion. @@ -71,9 +72,10 @@ class TopicDeletionManager(controller: KafkaController, val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted + val deleteLock = new ReentrantLock() var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) - val deleteTopicsCond = controllerContext.controllerLock.newCondition() + val deleteTopicsCond = deleteLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -195,11 +197,13 @@ class TopicDeletionManager(controller: KafkaController, * controllerLock should be acquired before invoking this API */ private def awaitTopicDeletionNotification() { - while(!deleteTopicStateChanged) { - info("Waiting for signal to start or continue topic deletion") - deleteTopicsCond.await() + inLock(deleteLock) { + while(!deleteTopicStateChanged) { + info("Waiting for signal to start or continue topic deletion") + deleteTopicsCond.await() + } + deleteTopicStateChanged = false } - deleteTopicStateChanged = false } /** @@ -207,7 +211,9 @@ class TopicDeletionManager(controller: KafkaController, */ private def resumeTopicDeletionThread() { deleteTopicStateChanged = true - deleteTopicsCond.signal() + inLock(deleteLock) { + deleteTopicsCond.signal() + } } /** @@ -352,8 +358,9 @@ class TopicDeletionManager(controller: KafkaController, class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient override def doWork() { + awaitTopicDeletionNotification() + inLock(controllerContext.controllerLock) { - awaitTopicDeletionNotification() val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted if(topicsQueuedForDeletion.size > 0) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd..519cf26 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -139,35 +139,37 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @Test def testLog4jAppends() { - PropertyConfigurator.configure(getLog4jConfig) + val topic = TestUtils.tempTopic() + PropertyConfigurator.configure({ + var props = new Properties() + props.put("log4j.rootLogger", "INFO") + props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put("log4j.appender.KAFKA.Topic", topic) + props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") + props.put("log4j.appender.KAFKA.requiredNumAcks", "1") + props + }) for(i <- 1 to 5) info("test") - val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) - val fetchMessage = response.messageSet("test-topic", 0) + TestUtils.waitUntilTrue({ + val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 1024 * 1024).build()) + val fetchMessage = response.messageSet(topic, 0) - var count = 0 - for(message <- fetchMessage) { - count = count + 1 - } + var count = 0 + for (message <- fetchMessage) { + count = count + 1 + } - assertEquals(5, count) + return count == 5 + }, 200) } - private def getLog4jConfig: Properties = { - var props = new Properties() - props.put("log4j.rootLogger", "INFO") - props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) - props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") - props.put("log4j.appender.KAFKA.requiredNumAcks", "1") - props } -} class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingEvent] { def toBytes(event: LoggingEvent): Array[Byte] = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b5936d4..e69e0fe 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -31,16 +31,16 @@ class ReplicaManagerTest extends JUnit3Suite { @Test def testHighwaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1) - val dir = "/tmp/kafka-logs/" - new File(dir).mkdir() - props.setProperty("log.dirs", dir) + val dir = TestUtils.tempDir() + props.setProperty("log.dirs", dir.getAbsolutePath + File.separator) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) val mockLogMgr = EasyMock.createMock(classOf[LogManager]) val time: MockTime = new MockTime() + val topic = TestUtils.tempTopic() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) - val partition = rm.getOrCreatePartition("test-topic", 1, 1) - partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) + val partition = rm.getOrCreatePartition(topic, 1, 1) + partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File(dir, topic), new LogConfig(), 0L, null)))) rm.checkpointHighWatermarks() } } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 20fe93e..c7e058f 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -96,5 +96,25 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() server.shutdown() Utils.rm(server.config.logDirs) + verifyNonDaemonThreadsStatus + } + + @Test + def testCleanShutdownWithDeleteTopicEnabled() { + val newProps = TestUtils.createBrokerConfig(0, port) + newProps.setProperty("delete.topic.enable", "true") + val newConfig = new KafkaConfig(newProps) + var server = new KafkaServer(newConfig) + server.startup() + server.shutdown() + server.awaitShutdown() + Utils.rm(server.config.logDirs) + verifyNonDaemonThreadsStatus + } + + def verifyNonDaemonThreadsStatus() { + assertEquals(0, Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2054c25..799668d 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -88,6 +88,8 @@ object TestUtils extends Logging { f } + def tempTopic(): String = "test-topic-" + random.nextInt(1000000) + /** * Create a temporary file */