diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala index cf3853b..4f20823 100644 --- a/core/src/main/scala/kafka/consumer/TopicFilter.scala +++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala @@ -41,14 +41,10 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging { override def toString = regex - def requiresTopicEventWatcher: Boolean - def isTopicAllowed(topic: String): Boolean } case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { - override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""") - override def isTopicAllowed(topic: String) = { val allowed = topic.matches(regex) @@ -62,8 +58,6 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { } case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { - override def requiresTopicEventWatcher = true - override def isTopicAllowed(topic: String) = { val allowed = !topic.matches(regex) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 6d0cfa6..0cc236a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -754,19 +754,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount) reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams) - if (!topicFilter.requiresTopicEventWatcher) { - info("Not creating event watcher for trivial whitelist " + topicFilter) - } - else { - info("Creating topic event watcher for whitelist " + topicFilter) - wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this) - - /* - * Topic events will trigger subsequent synced rebalances. Also, the - * consumer will get registered only after an allowed topic becomes - * available. - */ - } + /* + * Topic events will trigger subsequent synced rebalances. + */ + info("Creating topic event watcher for topics " + topicFilter) + wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkClient, this) def handleTopicEvent(allTopics: Seq[String]) { debug("Handling topic event") diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index a67c193..38f4ec0 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -22,14 +22,11 @@ import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState -class ZookeeperTopicEventWatcher(val config:ConsumerConfig, +class ZookeeperTopicEventWatcher(val zkClient: ZkClient, val eventHandler: TopicEventHandler[String]) extends Logging { val lock = new Object() - private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, ZKStringSerializer) - startWatchingTopicEvents() private def startWatchingTopicEvents() { @@ -53,11 +50,10 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, info("Shutting down topic event watcher.") if (zkClient != null) { stopWatchingTopicEvents() - zkClient.close() - zkClient = null } - else - warn("Cannot shutdown already shutdown topic event watcher.") + else { + warn("Cannot shutdown since the embedded zookeeper client has already closed.") + } } } @@ -70,7 +66,6 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, if (zkClient != null) { val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList debug("all topics: %s".format(latestTopics)) - eventHandler.handleTopicEvent(latestTopics) } } @@ -93,10 +88,8 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, def handleNewSession() { lock.synchronized { if (zkClient != null) { - info( - "ZK expired: resubscribing topic event listener to topic registry") - zkClient.subscribeChildChanges( - ZkUtils.BrokerTopicsPath, topicEventListener) + info("ZK expired: resubscribing topic event listener to topic registry") + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) } } } diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 40a2bf7..cf2724b 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -29,16 +29,13 @@ class TopicFilterTest extends JUnitSuite { def testWhitelists() { val topicFilter1 = new Whitelist("white1,white2") - assertFalse(topicFilter1.requiresTopicEventWatcher) assertTrue(topicFilter1.isTopicAllowed("white2")) assertFalse(topicFilter1.isTopicAllowed("black1")) val topicFilter2 = new Whitelist(".+") - assertTrue(topicFilter2.requiresTopicEventWatcher) assertTrue(topicFilter2.isTopicAllowed("alltopics")) val topicFilter3 = new Whitelist("white_listed-topic.+") - assertTrue(topicFilter3.requiresTopicEventWatcher) assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1")) assertFalse(topicFilter3.isTopicAllowed("black1")) } @@ -46,6 +43,5 @@ class TopicFilterTest extends JUnitSuite { @Test def testBlacklists() { val topicFilter1 = new Blacklist("black1") - assertTrue(topicFilter1.requiresTopicEventWatcher) } } \ No newline at end of file diff --git a/system_test/migration_tool_testsuite/0.7/config/test-log4j.properties b/system_test/migration_tool_testsuite/0.7/config/test-log4j.properties deleted file mode 100644 index a3ae33f..0000000 --- a/system_test/migration_tool_testsuite/0.7/config/test-log4j.properties +++ /dev/null @@ -1,68 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -log4j.rootLogger=INFO, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.kafkaAppender.File=logs/server.log -log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.stateChangeAppender.File=logs/state-change.log -log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.requestAppender.File=logs/kafka-request.log -log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.controllerAppender.File=logs/controller.log -log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -# Turn on all our debugging info -#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender -#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender -log4j.logger.kafka.perf=DEBUG, kafkaAppender -log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender -#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG -log4j.logger.kafka=INFO, kafkaAppender - -log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender -log4j.additivity.kafka.network.RequestChannel$=false - -#log4j.logger.kafka.network.Processor=TRACE, requestAppender -#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender -#log4j.additivity.kafka.server.KafkaApis=false -log4j.logger.kafka.request.logger=TRACE, requestAppender -log4j.additivity.kafka.request.logger=false - -log4j.logger.kafka.controller=TRACE, controllerAppender -log4j.additivity.kafka.controller=false - -log4j.logger.state.change.logger=TRACE, stateChangeAppender -log4j.additivity.state.change.logger=false - -