Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1294176) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -413,16 +413,19 @@ } } - private def releasePartitionOwnership()= { + private def deletePartitionOwnershipFromZK(topic: String, partition: String) { + val topicDirs = new ZKGroupTopicDirs(group, topic) + val znode = topicDirs.consumerOwnerDir + "/" + partition + deletePath(zkClient, znode) + debug("Consumer " + consumerIdString + " releasing " + znode) + } + + private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]])= { info("Releasing partition ownership") - for ((topic, infos) <- topicRegistry) { - val topicDirs = new ZKGroupTopicDirs(group, topic) - for(partition <- infos.keys) { - val znode = topicDirs.consumerOwnerDir + "/" + partition - deletePath(zkClient, znode) - debug("Consumer " + consumerIdString + " releasing " + znode) - } - topicRegistry.remove(topic) + for ((topic, infos) <- localTopicRegistry) { + for(partition <- infos.keys) + deletePartitionOwnershipFromZK(topic, partition.toString) + localTopicRegistry.remove(topic) } } @@ -446,8 +449,6 @@ * the value of a child. Just let this go since another rebalance will be triggered. **/ info("exception during rebalance ", e) - /* Explicitly make sure another rebalancing attempt will get triggered. */ - done = false } info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) { @@ -459,8 +460,6 @@ } // stop all fetchers and clear all the queues to avoid data duplication closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2)) - // release all partitions, reset state and retry - releasePartitionOwnership() Thread.sleep(config.rebalanceBackoffMs) } } @@ -481,7 +480,7 @@ */ closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap) - releasePartitionOwnership() + releasePartitionOwnership(topicRegistry) var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]() var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] @@ -534,8 +533,9 @@ topicRegistry = currentTopicRegistry updateFetcher(cluster, kafkaMessageStreams) true - }else + }else { false + } } private def closeFetchersForQueues(cluster: Cluster, @@ -585,6 +585,7 @@ } private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = { + var successfullyOwnedPartitions : List[(String, String)] = Nil val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => val topic = partitionOwner._1._1 val partition = partitionOwner._1._2 @@ -594,6 +595,7 @@ try { createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) + successfullyOwnedPartitions ::= (topic, partition) true } catch { @@ -606,7 +608,11 @@ } val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1)) /* even if one of the partition ownership attempt has failed, return false */ - if(hasPartitionOwnershipFailed > 0) false + if(hasPartitionOwnershipFailed > 0) { + // remove all paths that we have owned in ZK + successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2)) + false + } else true }