Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1245019) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -379,17 +379,20 @@ private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { override def run() { info("starting watcher executor thread for consumer " + consumerIdString) + var doRebalance = false while (!isShuttingDown.get) { try { lock.lock() try { if (!isWatcherTriggered) - cond.await() + cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag } finally { + doRebalance = isWatcherTriggered isWatcherTriggered = false lock.unlock() } - syncedRebalance + if (doRebalance) + syncedRebalance } catch { case t => error("error during syncedRebalance", t) }