From 29a698a2aa97f0936cf128e80e43ce32f34764d4 Mon Sep 17 00:00:00 2001 From: Onur Karaman Date: Thu, 14 May 2015 23:09:39 -0700 Subject: [PATCH] remove roundrobin identical topic constraint in consumer coordinator --- .../src/main/scala/kafka/coordinator/PartitionAssignor.scala | 12 ++++-------- .../scala/unit/kafka/coordinator/PartitionAssignorTest.scala | 11 ++++++++--- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala index 1069822..8499bf8 100644 --- a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala @@ -66,25 +66,21 @@ private[coordinator] object PartitionAssignor { * The assignment will be: * C0 -> [t0p0, t0p2, t1p1] * C1 -> [t0p1, t1p0, t1p2] - * - * roundrobin assignment is allowed only if the set of subscribed topics is identical for every consumer within the group. */ private[coordinator] class RoundRobinAssignor extends PartitionAssignor { override def assign(topicsPerConsumer: Map[String, Set[String]], partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = { - val consumersHaveIdenticalTopics = topicsPerConsumer.values.toSet.size == 1 - require(consumersHaveIdenticalTopics, - "roundrobin assignment is allowed only if all consumers in the group subscribe to the same topics") val consumers = topicsPerConsumer.keys.toSeq.sorted - val topics = topicsPerConsumer.head._2 - val consumerAssignor = CoreUtils.circularIterator(consumers) + val topics = topicsPerConsumer.values.flatten.toSeq.distinct.sorted - val allTopicPartitions = topics.toSeq.flatMap { topic => + val allTopicPartitions = topics.flatMap { topic => val numPartitionsForTopic = partitionsPerTopic(topic) (0 until numPartitionsForTopic).map(partition => TopicAndPartition(topic, partition)) } + var consumerAssignor = CoreUtils.circularIterator(consumers) val consumerPartitionPairs = allTopicPartitions.map { topicAndPartition => + consumerAssignor = consumerAssignor.dropWhile(consumerId => !topicsPerConsumer(consumerId).contains(topicAndPartition.topic)) val consumer = consumerAssignor.next() (consumer, topicAndPartition) } diff --git a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala index ba6d5cd..887cee5 100644 --- a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala @@ -259,8 +259,8 @@ class PartitionAssignorTest extends JUnitSuite { assertEquals(expected, actual) } - @Test(expected = classOf[IllegalArgumentException]) - def testRoundRobinAssignorCannotAssignWithMixedTopics() { + @Test + def testRoundRobinAssignorMultipleConsumersMixedTopics() { val topic1 = "topic1" val topic2 = "topic2" val consumer1 = "consumer1" @@ -271,7 +271,12 @@ class PartitionAssignorTest extends JUnitSuite { val assignor = new RoundRobinAssignor() val topicsPerConsumer = Map(consumer1 -> Set(topic1), consumer2 -> Set(topic1, topic2), consumer3 -> Set(topic1)) val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) - assignor.assign(topicsPerConsumer, partitionsPerTopic) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map( + consumer1 -> topicAndPartitions(Map(topic1 -> Set(0))), + consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic2 -> Set(0, 1))), + consumer3 -> topicAndPartitions(Map(topic1 -> Set(2)))) + assertEquals(expected, actual) } @Test -- 1.9.3 (Apple Git-50)