diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties index 7924049..44254f5 100644 --- a/config/tools-log4j.properties +++ b/config/tools-log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=WARN, stdout +log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 08b4b72..df67dc5 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -25,7 +25,7 @@ import kafka.cluster._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException import java.net.InetAddress -import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ @@ -90,6 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null + private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null @@ -268,8 +269,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - - class ZKSessionExpireListener(val dirs: ZKGroupDirs, val consumerIdString: String, val topicCount: TopicCount, @@ -306,6 +305,29 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } + class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) + extends IZkDataListener { + + def handleDataChange(dataPath : String, data: Object) { + try { + info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") + // explicitly trigger load balancing for this consumer + + // There is no need to resubscribe to data changes. + // The data change watchers will be set inside rebalance when we read the path to get the partitions. + loadBalancerListener.syncedRebalance() + } catch { + case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e ) + } + } + + @throws(classOf[Exception]) + def handleDataDeleted(dataPath : String) { + // TODO: This need to be implemented when we support delete topic + warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time") + } + } + class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { @@ -631,6 +653,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, sessionExpirationListener = new ZKSessionExpireListener( dirs, consumerIdString, topicCount, loadBalancerListener) + // register listener for topic partition change event + if (topicPartitionChangeListenner == null) + topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener) + val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} @@ -686,8 +712,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes - val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) + val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 + zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner) } // explicitly trigger load balancing for this consumer