From 641a9037f27c757fd1b6682c2d481ff742a226a5 Mon Sep 17 00:00:00 2001 From: mgharat Date: Thu, 2 Oct 2014 15:41:53 -0700 Subject: [PATCH 1/6] Added a check to see if there are not topics --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 5 +++++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 8ea7368..e9162dc 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -71,6 +71,11 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + val topics = ctx.consumersForTopic.map { case (topic, threadIds) => topic} + if (topics.size <= 0) { + partitionOwnershipDecision + } + // check conditions (a) and (b) val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) ctx.consumersForTopic.foreach { case (topic, threadIds) => diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..73d226b 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -651,6 +651,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) +// + if(partitionOwnershipDecision.isEmpty) + return true + val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) -- 1.9.3 (Apple Git-50) From 151b8a9263ba2a800781db79c77a59abc791cbef Mon Sep 17 00:00:00 2001 From: mgharat Date: Sat, 4 Oct 2014 17:40:01 -0700 Subject: [PATCH 2/6] Removed the unnecessary comment --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 73d226b..9b16437 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -651,7 +651,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) -// + if(partitionOwnershipDecision.isEmpty) return true -- 1.9.3 (Apple Git-50) From c5e6da15ee1dee43f8f170eb201a11348e953a3a Mon Sep 17 00:00:00 2001 From: mgharat Date: Wed, 8 Oct 2014 17:27:34 -0700 Subject: [PATCH 3/6] Made a change to the way the condition for no topics is checked --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index e9162dc..639136a 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -71,10 +71,8 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() - val topics = ctx.consumersForTopic.map { case (topic, threadIds) => topic} - if (topics.size <= 0) { + if (ctx.consumersForTopic.size <= 0) partitionOwnershipDecision - } // check conditions (a) and (b) val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) -- 1.9.3 (Apple Git-50) From f042f5cc2935a0cd6fa5294c00ce18c2b1b6be16 Mon Sep 17 00:00:00 2001 From: mgharat Date: Wed, 8 Oct 2014 17:46:40 -0700 Subject: [PATCH 4/6] Cleaned unnecessary code and modified the test case to handle the no topic scenario --- .../scala/kafka/consumer/PartitionAssignor.scala | 68 +++++++++++----------- .../consumer/ZookeeperConsumerConnector.scala | 3 - .../kafka/consumer/PartitionAssignorTest.scala | 2 +- 3 files changed, 35 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 639136a..2e10157 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -71,44 +71,44 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() - if (ctx.consumersForTopic.size <= 0) - partitionOwnershipDecision - - // check conditions (a) and (b) - val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) - ctx.consumersForTopic.foreach { case (topic, threadIds) => - val threadIdSet = threadIds.toSet - require(threadIdSet == headThreadIdSet, - "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + - "AND if the stream counts across topics are identical for a given consumer instance.\n" + - "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + - "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) - } + if (ctx.consumersForTopic.size > 0) + { + // check conditions (a) and (b) + val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) + ctx.consumersForTopic.foreach { case (topic, threadIds) => + val threadIdSet = threadIds.toSet + require(threadIdSet == headThreadIdSet, + "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + + "AND if the stream counts across topics are identical for a given consumer instance.\n" + + "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + + "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) + } - val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted) + val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted) + + info("Starting round-robin assignment with consumers " + ctx.consumers) + val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => + info("Consumer %s rebalancing the following partitions for topic %s: %s" + .format(ctx.consumerId, topic, partitions)) + partitions.map(partition => { + TopicAndPartition(topic, partition) + }) + }.toSeq.sortWith((topicPartition1, topicPartition2) => { + /* + * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending + * up on one consumer (if it has a high enough stream count). + */ + topicPartition1.toString.hashCode < topicPartition2.toString.hashCode + }) - info("Starting round-robin assignment with consumers " + ctx.consumers) - val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => - info("Consumer %s rebalancing the following partitions for topic %s: %s" - .format(ctx.consumerId, topic, partitions)) - partitions.map(partition => { - TopicAndPartition(topic, partition) + allTopicPartitions.foreach(topicPartition => { + val threadId = threadAssignor.next() + if (threadId.consumer == ctx.consumerId) + partitionOwnershipDecision += (topicPartition -> threadId) }) - }.toSeq.sortWith((topicPartition1, topicPartition2) => { - /* - * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending - * up on one consumer (if it has a high enough stream count). - */ - topicPartition1.toString.hashCode < topicPartition2.toString.hashCode - }) - - allTopicPartitions.foreach(topicPartition => { - val threadId = threadAssignor.next() - if (threadId.consumer == ctx.consumerId) - partitionOwnershipDecision += (topicPartition -> threadId) - }) - partitionOwnershipDecision + partitionOwnershipDecision + } } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9b16437..1fe080c 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -652,9 +652,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) - if(partitionOwnershipDecision.isEmpty) - return true - val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index 9ceae22..24954de 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -87,7 +87,7 @@ private object PartitionAssignorTest extends Logging { private val MaxConsumerCount = 10 private val MaxStreamCount = 8 private val MaxTopicCount = 100 - private val MinTopicCount = 20 + private val MinTopicCount = 0 private val MaxPartitionCount = 120 private val MinPartitionCount = 8 -- 1.9.3 (Apple Git-50) From 774ce73dddef780b64a717d619ec296f9e0233ef Mon Sep 17 00:00:00 2001 From: mgharat Date: Thu, 9 Oct 2014 11:54:50 -0700 Subject: [PATCH 5/6] Formatted the code --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 2e10157..df30103 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -71,8 +71,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() - if (ctx.consumersForTopic.size > 0) - { + if (ctx.consumersForTopic.size > 0) { // check conditions (a) and (b) val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) ctx.consumersForTopic.foreach { case (topic, threadIds) => @@ -87,7 +86,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted) info("Starting round-robin assignment with consumers " + ctx.consumers) - val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => + val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, partitions) => info("Consumer %s rebalancing the following partitions for topic %s: %s" .format(ctx.consumerId, topic, partitions)) partitions.map(partition => { @@ -106,9 +105,10 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { if (threadId.consumer == ctx.consumerId) partitionOwnershipDecision += (topicPartition -> threadId) }) - - partitionOwnershipDecision } + + partitionOwnershipDecision + } } -- 1.9.3 (Apple Git-50) From 8e739f53392f8c1b32456e5f92a535e7b2df32e3 Mon Sep 17 00:00:00 2001 From: mgharat Date: Thu, 9 Oct 2014 11:55:49 -0700 Subject: [PATCH 6/6] Formatted the code --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index df30103..e6ff768 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -108,7 +108,6 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { } partitionOwnershipDecision - } } -- 1.9.3 (Apple Git-50)