Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1242197) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -30,6 +30,7 @@ val AutoCommit = true val AutoCommitInterval = 10 * 1000 val MaxQueuedChunks = 100 + val DefaultZkCallbackQueueSize=1000 val MaxRebalanceRetries = 4 val AutoOffsetReset = OffsetRequest.SmallestTimeString val ConsumerTimeoutMs = -1 @@ -75,6 +76,9 @@ /** max number of messages buffered for consumption */ val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks) + /** number of ZK callbacks that can be queued */ + val zkCallbackQueuedSize = Utils.getInt(props, "zk.callback.queue.size", DefaultZkCallbackQueueSize) + /** max number of retries during rebalance */ val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries) Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1242197) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -30,8 +30,8 @@ import java.util.UUID import kafka.serializer.Decoder import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException} -import java.lang.IllegalStateException import kafka.utils.ZkUtils._ +import java.lang.{Thread, IllegalStateException} /** * This class handles the consumers interaction with zookeeper @@ -372,13 +372,30 @@ class ZKRebalancerListener[T](val group: String, val consumerIdString: String, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) extends IZkChildListener { - private val dirs = new ZKGroupDirs(group) private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() + private val watcherQueue = new LinkedBlockingQueue[Int](config.zkCallbackQueuedSize) + private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { + override def run() { + info("starting watcher executor thread for consumer " + consumerIdString) + while (!isShuttingDown.get()) { + try { + watcherQueue.take() + debug("clearing watcher queue of size " + watcherQueue.size()) + watcherQueue.clear() + syncedRebalance + } catch { + case t => error("error during syncedRebalance", t) + } + } + info("stoping watcher executor thread for consumer " + consumerIdString) + } + } + watcherExecutorThread.start() @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - syncedRebalance + watcherQueue.put(1) } private def releasePartitionOwnership()= {