diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala index cf3853b..4f20823 100644 --- a/core/src/main/scala/kafka/consumer/TopicFilter.scala +++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala @@ -41,14 +41,10 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging { override def toString = regex - def requiresTopicEventWatcher: Boolean - def isTopicAllowed(topic: String): Boolean } case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { - override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""") - override def isTopicAllowed(topic: String) = { val allowed = topic.matches(regex) @@ -62,8 +58,6 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { } case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { - override def requiresTopicEventWatcher = true - override def isTopicAllowed(topic: String) = { val allowed = !topic.matches(regex) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index c0350cd..11fd984 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -754,19 +754,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount) reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams) - if (!topicFilter.requiresTopicEventWatcher) { - info("Not creating event watcher for trivial whitelist " + topicFilter) - } - else { - info("Creating topic event watcher for whitelist " + topicFilter) - wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this) - - /* - * Topic events will trigger subsequent synced rebalances. Also, the - * consumer will get registered only after an allowed topic becomes - * available. - */ - } + /* + * Topic events will trigger subsequent synced rebalances. + */ + info("Creating topic event watcher for whitelist " + topicFilter) + wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, zkClient, this) def handleTopicEvent(allTopics: Seq[String]) { debug("Handling topic event") diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index a67c193..896b05e 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -22,14 +22,11 @@ import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState -class ZookeeperTopicEventWatcher(val config:ConsumerConfig, +class ZookeeperTopicEventWatcher(val config: ConsumerConfig, val zkClient: ZkClient, val eventHandler: TopicEventHandler[String]) extends Logging { val lock = new Object() - private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, ZKStringSerializer) - startWatchingTopicEvents() private def startWatchingTopicEvents() { @@ -53,11 +50,10 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, info("Shutting down topic event watcher.") if (zkClient != null) { stopWatchingTopicEvents() - zkClient.close() - zkClient = null } - else - warn("Cannot shutdown already shutdown topic event watcher.") + else { + warn("Cannot shutdown since the imbedded zookeeper client has already closed.") + } } } @@ -70,7 +66,6 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, if (zkClient != null) { val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList debug("all topics: %s".format(latestTopics)) - eventHandler.handleTopicEvent(latestTopics) } } @@ -93,10 +88,8 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, def handleNewSession() { lock.synchronized { if (zkClient != null) { - info( - "ZK expired: resubscribing topic event listener to topic registry") - zkClient.subscribeChildChanges( - ZkUtils.BrokerTopicsPath, topicEventListener) + info("ZK expired: resubscribing topic event listener to topic registry") + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) } } } diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 40a2bf7..cf2724b 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -29,16 +29,13 @@ class TopicFilterTest extends JUnitSuite { def testWhitelists() { val topicFilter1 = new Whitelist("white1,white2") - assertFalse(topicFilter1.requiresTopicEventWatcher) assertTrue(topicFilter1.isTopicAllowed("white2")) assertFalse(topicFilter1.isTopicAllowed("black1")) val topicFilter2 = new Whitelist(".+") - assertTrue(topicFilter2.requiresTopicEventWatcher) assertTrue(topicFilter2.isTopicAllowed("alltopics")) val topicFilter3 = new Whitelist("white_listed-topic.+") - assertTrue(topicFilter3.requiresTopicEventWatcher) assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1")) assertFalse(topicFilter3.isTopicAllowed("black1")) } @@ -46,6 +43,5 @@ class TopicFilterTest extends JUnitSuite { @Test def testBlacklists() { val topicFilter1 = new Blacklist("black1") - assertTrue(topicFilter1.requiresTopicEventWatcher) } } \ No newline at end of file