diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 08b4b72..36b167b 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -25,7 +25,7 @@ import kafka.cluster._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException import java.net.InetAddress -import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ @@ -90,6 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null + private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null @@ -268,8 +269,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - - class ZKSessionExpireListener(val dirs: ZKGroupDirs, val consumerIdString: String, val topicCount: TopicCount, @@ -306,6 +305,29 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } + class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) + extends IZkDataListener { + + def handleDataChange(dataPath : String, data: Object) { + try { + info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") + // explicitly trigger load balancing for this consumer + loadBalancerListener.syncedRebalance() + + // There is no need to re-subscribe the watcher since it will be automatically + // re-registered upon firing of this event by zkClient + } catch { + case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e ) + } + } + + @throws(classOf[Exception]) + def handleDataDeleted(dataPath : String) { + // TODO: This need to be implemented when we support delete topic + warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time") + } + } + class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { @@ -626,11 +648,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) } - // register listener for session expired event + // create listener for session expired event if not exist yet if (sessionExpirationListener == null) sessionExpirationListener = new ZKSessionExpireListener( dirs, consumerIdString, topicCount, loadBalancerListener) + // create listener for topic partition change event if not exist yet + if (topicPartitionChangeListenner == null) + topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener) + val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} @@ -686,8 +712,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes - val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) + val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 + zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner) } // explicitly trigger load balancing for this consumer