Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1293543) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -413,16 +413,16 @@ } } - private def releasePartitionOwnership()= { + private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]])= { info("Releasing partition ownership") - for ((topic, infos) <- topicRegistry) { + for ((topic, infos) <- localTopicRegistry) { val topicDirs = new ZKGroupTopicDirs(group, topic) for(partition <- infos.keys) { val znode = topicDirs.consumerOwnerDir + "/" + partition deletePath(zkClient, znode) debug("Consumer " + consumerIdString + " releasing " + znode) } - topicRegistry.remove(topic) + localTopicRegistry.remove(topic) } } @@ -446,8 +446,6 @@ * the value of a child. Just let this go since another rebalance will be triggered. **/ info("exception during rebalance ", e) - /* Explicitly make sure another rebalancing attempt will get triggered. */ - done = false } info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) { @@ -459,8 +457,6 @@ } // stop all fetchers and clear all the queues to avoid data duplication closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2)) - // release all partitions, reset state and retry - releasePartitionOwnership() Thread.sleep(config.rebalanceBackoffMs) } } @@ -481,7 +477,7 @@ */ closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap) - releasePartitionOwnership() + releasePartitionOwnership(topicRegistry) var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]() var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] @@ -534,8 +530,11 @@ topicRegistry = currentTopicRegistry updateFetcher(cluster, kafkaMessageStreams) true - }else + }else { + // release any partitions that we may have owned in ZK + releasePartitionOwnership(currentTopicRegistry) false + } } private def closeFetchersForQueues(cluster: Cluster,