From b8b1ee2beecb2e430457cafc309cd8a76fc6d05c Mon Sep 17 00:00:00 2001 From: Anatoly Fayngelerin Date: Tue, 9 Dec 2014 11:31:08 -0500 Subject: [PATCH] Making sure that the ZK Consumer Connector tests clean up all consumer threads. This was causing test failures in the ServerShutdownTest. ServerShutdownTest were missing null checks. --- .../unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala | 3 ++- .../test/scala/unit/kafka/server/ServerShutdownTest.scala | 12 ++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 3ccccbd..a17e853 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -387,7 +387,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic)) assertEquals(true, rebalanceListener2.listenerCalled) assertEquals(null, rebalanceListener2.partitionOwnership.get(topic)) - + zkConsumerConnector1.shutdown() + zkConsumerConnector2.shutdown() } def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 67918f2..ba1e48e 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -141,10 +141,18 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { verifyNonDaemonThreadsStatus } + private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = { + val threadName = Option(t.getClass.getCanonicalName) + .getOrElse(t.getClass.getName()) + .toLowerCase + + !t.isDaemon && t.isAlive && threadName.startsWith("kafka") + } + def verifyNonDaemonThreadsStatus() { assertEquals(0, Thread.getAllStackTraces.keySet().toArray - .map(_.asInstanceOf[Thread]) - .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) + .map{ _.asInstanceOf[Thread] } + .count(isNonDaemonKafkaThread)) } def testConsecutiveShutdown(){ -- 1.7.12.4 (Apple Git-37)