From a3a7fb7f5cc059ce6ed726863aff98efeb2e3084 Mon Sep 17 00:00:00 2001 From: mgharat Date: Thu, 2 Oct 2014 15:41:53 -0700 Subject: [PATCH 1/2] Added a check to see if there are not topics --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 5 +++++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 8ea7368..e9162dc 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -71,6 +71,11 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + val topics = ctx.consumersForTopic.map { case (topic, threadIds) => topic} + if (topics.size <= 0) { + partitionOwnershipDecision + } + // check conditions (a) and (b) val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) ctx.consumersForTopic.foreach { case (topic, threadIds) => diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..73d226b 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -651,6 +651,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) +// + if(partitionOwnershipDecision.isEmpty) + return true + val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) -- 1.9.3 (Apple Git-50) From 457ed55f666e168662a2ebba0b8d0e15d292598e Mon Sep 17 00:00:00 2001 From: mgharat Date: Sat, 4 Oct 2014 17:40:01 -0700 Subject: [PATCH 2/2] Removed the unnecessary comment --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 73d226b..9b16437 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -651,7 +651,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) -// + if(partitionOwnershipDecision.isEmpty) return true -- 1.9.3 (Apple Git-50)