From 641a9037f27c757fd1b6682c2d481ff742a226a5 Mon Sep 17 00:00:00 2001 From: mgharat Date: Thu, 2 Oct 2014 15:41:53 -0700 Subject: [PATCH 1/3] Added a check to see if there are not topics --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 5 +++++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 8ea7368..e9162dc 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -71,6 +71,11 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + val topics = ctx.consumersForTopic.map { case (topic, threadIds) => topic} + if (topics.size <= 0) { + partitionOwnershipDecision + } + // check conditions (a) and (b) val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) ctx.consumersForTopic.foreach { case (topic, threadIds) => diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..73d226b 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -651,6 +651,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) +// + if(partitionOwnershipDecision.isEmpty) + return true + val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) -- 1.9.3 (Apple Git-50) From 151b8a9263ba2a800781db79c77a59abc791cbef Mon Sep 17 00:00:00 2001 From: mgharat Date: Sat, 4 Oct 2014 17:40:01 -0700 Subject: [PATCH 2/3] Removed the unnecessary comment --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 73d226b..9b16437 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -651,7 +651,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) -// + if(partitionOwnershipDecision.isEmpty) return true -- 1.9.3 (Apple Git-50) From c5e6da15ee1dee43f8f170eb201a11348e953a3a Mon Sep 17 00:00:00 2001 From: mgharat Date: Wed, 8 Oct 2014 17:27:34 -0700 Subject: [PATCH 3/3] Made a change to the way the condition for no topics is checked --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index e9162dc..639136a 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -71,10 +71,8 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() - val topics = ctx.consumersForTopic.map { case (topic, threadIds) => topic} - if (topics.size <= 0) { + if (ctx.consumersForTopic.size <= 0) partitionOwnershipDecision - } // check conditions (a) and (b) val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) -- 1.9.3 (Apple Git-50)