From 632e095c00b0b2b6713ab929066b5a3fd796b07e Mon Sep 17 00:00:00 2001 From: Andrew Olson Date: Fri, 14 Aug 2015 10:04:33 -0500 Subject: [PATCH] Fair assignment strategy --- .../kafka/clients/consumer/ConsumerConfig.java | 2 +- .../main/scala/kafka/consumer/ConsumerConfig.scala | 7 +- .../scala/kafka/consumer/PartitionAssignor.scala | 81 +++++++- .../kafka/coordinator/PartitionAssignor.scala | 56 +++++- .../kafka/consumer/PartitionAssignorTest.scala | 78 ++++++++ .../kafka/coordinator/PartitionAssignorTest.scala | 220 +++++++++++++++++++++ 6 files changed, 436 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index d35b421..00a5873 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -185,7 +185,7 @@ public class ConsumerConfig extends AbstractConfig { .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Type.STRING, "range", - in("range", "roundrobin"), + in("range", "roundrobin", "fair"), Importance.MEDIUM, PARTITION_ASSIGNMENT_STRATEGY_DOC) .define(METADATA_MAX_AGE_CONFIG, diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 97a56ce..d8071f4 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -49,7 +49,7 @@ object ConsumerConfig extends Config { val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" val ExcludeInternalTopics = true - val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */ + val DefaultPartitionAssignmentStrategy = "range" /* select between "range", "roundrobin", and "fair" */ val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" @@ -91,8 +91,9 @@ object ConsumerConfig extends Config { strategy match { case "range" => case "roundrobin" => + case "fair" => case _ => throw new InvalidConfigException("Wrong value " + strategy + " of partition.assignment.strategy in consumer config; " + - "Valid values are 'range' and 'roundrobin'") + "Valid values are 'range', 'roundrobin', and 'fair'") } } } @@ -188,7 +189,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics) - /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ + /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin, fair */ val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy) validate(this) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 849284a..eaaa1f1 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -37,6 +37,7 @@ trait PartitionAssignor { object PartitionAssignor { def createInstance(assignmentStrategy: String) = assignmentStrategy match { case "roundrobin" => new RoundRobinAssignor() + case "fair" => new FairAssignor() case _ => new RangeAssignor() } } @@ -47,12 +48,13 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo myTopicCount.getConsumerThreadIdsPerTopic } - val partitionsForTopic: collection.Map[String, Seq[Int]] = - ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq) - val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics) + // Some assignment strategies require knowledge of all topics consumed by any member of the group + val partitionsForTopic: collection.Map[String, Seq[Int]] = + ZkUtils.getPartitionsForTopics(zkClient, consumersForTopic.keySet.toSeq) + val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted } @@ -173,3 +175,76 @@ class RangeAssignor() extends PartitionAssignor with Logging { partitionAssignment } } + +/** + * The fair assignor attempts to balance partitions across consumers such that each consumer thread is assigned approximately + * the same number of partitions, even if the consumer topic subscriptions are substantially different (if they are identical, + * then the result will be equivalent to that of the roundrobin assignor). The running total of assignments per consumer + * thread is tracked as the algorithm executes in order to accomplish this. + * + * The algorithm starts with the topic with the fewest consumer subscriptions, and assigns its partitions in roundrobin + * fashion. In the event of a tie for least subscriptions, the topic with the highest partition count is assigned first, as + * this generally creates a more balanced distribution. The final tiebreaker is the topic name. + * + * The partitions for subsequent topics are assigned to the subscribing consumer with the fewest number of assignments. + * In the event of a tie for least assignments, the tiebreaker is the consumer id, so that the assignment pattern is fairly + * similar to how the roundrobin assignor functions. + * + * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, + * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. If both C0 and C1 are consuming t0, but only C1 is + * consuming t1 then the assignment will be: + * C0 -> [t0p0, t0p1, t0p2] + * C1 -> [t1p0, t1p1, t1p2] + */ +class FairAssignor() extends PartitionAssignor with Logging { + + def assign(ctx: AssignmentContext) = { + val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId] + val partitionAssignment = + new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) + + if (ctx.consumersForTopic.size > 0) { + val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, threadIds) => threadIds } + + // Map for tracking the total number of partitions assigned to each consumer thread + val consumerAssignmentCounts: mutable.Map[ConsumerThreadId, Int] = mutable.Map() + for (threadId <- allThreadIds) { + consumerAssignmentCounts(threadId) = 0 + } + + // Assign topics with fewer consumers first, tiebreakers are most partitions, then topic name + val topicConsumerCounts = ctx.consumersForTopic.map { case(topic, threadIds) => + (topic -> threadIds.size) + }.toList.sortBy { + count => (count._2, -ctx.partitionsForTopic(count._1).size, count._1) + } + + val allTopicPartitions = topicConsumerCounts.flatMap { topicConsumerCount => + val topic = topicConsumerCount._1 + val partitions = ctx.partitionsForTopic(topic) + info("Consumer %s rebalancing the following partitions for topic %s: %s" + .format(ctx.consumerId, topic, partitions)) + ctx.partitionsForTopic(topic).map(partition => { + TopicAndPartition(topic, partition) + }) + } + + allTopicPartitions.foreach(topicPartition => { + val topicConsumers = ctx.consumersForTopic(topicPartition.topic) + val filteredCounts = consumerAssignmentCounts.toList.filter(consumer => topicConsumers.contains(consumer._1)) + + // Assign partition to consumer thread with least assignments, tiebreaker is consumer thread id + val threadId = filteredCounts.sortBy(count => (count._2, count._1)).head._1 + consumerAssignmentCounts(threadId) += 1 + + // record the partition ownership decision + val assignmentForConsumer = partitionAssignment.getAndMaybePut(threadId.consumer) + assignmentForConsumer += (topicPartition -> threadId) + }) + } + + // assign Map.empty for the consumers which are not associated with topic partitions + ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId)) + partitionAssignment + } +} diff --git a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala index 8499bf8..9b13ee8 100644 --- a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala @@ -20,6 +20,8 @@ package kafka.coordinator import kafka.common.TopicAndPartition import kafka.utils.CoreUtils +import scala.collection.mutable + private[coordinator] trait PartitionAssignor { /** * Assigns partitions to consumers in a group. @@ -46,10 +48,11 @@ private[coordinator] trait PartitionAssignor { } private[coordinator] object PartitionAssignor { - val strategies = Set("range", "roundrobin") + val strategies = Set("range", "roundrobin", "fair") def createInstance(strategy: String) = strategy match { case "roundrobin" => new RoundRobinAssignor() + case "fair" => new FairAssignor() case _ => new RangeAssignor() } } @@ -123,3 +126,54 @@ private[coordinator] class RangeAssignor extends PartitionAssignor { fill(aggregate(consumerPartitionPairs), topicsPerConsumer.keySet) } } + +/** + * The fair assignor attempts to balance partitions across consumers such that each consumer is assigned approximately + * the same number of partitions, even if the consumer topic subscriptions are substantially different (if they are identical, + * then the result will be equivalent to that of the roundrobin assignor). The running total of assignments per consumer + * is tracked as the algorithm executes in order to accomplish this. + * + * The algorithm starts with the topic with the fewest consumer subscriptions, and assigns its partitions in roundrobin + * fashion. In the event of a tie for least subscriptions, the topic with the highest partition count is assigned first, as + * this generally creates a more balanced distribution. The final tiebreaker is the topic name. + * + * The partitions for subsequent topics are assigned to the subscribing consumer with the fewest number of assignments. + * In the event of a tie for least assignments, the tiebreaker is the consumer id, so that the assignment pattern is fairly + * similar to how the roundrobin assignor functions. + * + * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, + * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. If both C0 and C1 are consuming t0, but only C1 is + * consuming t1 then the assignment will be: + * C0 -> [t0p0, t0p1, t0p2] + * C1 -> [t1p0, t1p1, t1p2] + */ +private[coordinator] class FairAssignor extends PartitionAssignor { + override def assign(topicsPerConsumer: Map[String, Set[String]], + partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = { + val consumers = topicsPerConsumer.keys.toSeq.sorted + val consumersPerTopic = invert(topicsPerConsumer) + + // Map for tracking the total number of partitions assigned to each consumer + val consumerAssignmentCounts: mutable.Map[String, Int] = mutable.Map() + for (consumer <- consumers) { + consumerAssignmentCounts(consumer) = 0 + } + + // Assign topics with fewer consumers first, tiebreakers are most partitions, then topic name + val topicConsumerCounts = consumersPerTopic.map { case(topic, consumers) => (topic -> consumers.size) } + val allTopicPartitions = topicConsumerCounts.toList.sortBy(count => (count._2, -partitionsPerTopic(count._1), count._1)).flatMap { + topicConsumerCount => (0 until partitionsPerTopic(topicConsumerCount._1)).map(partition => TopicAndPartition(topicConsumerCount._1, partition)) + } + + val consumerPartitionPairs = allTopicPartitions.map { topicAndPartition => + val topicConsumers = consumersPerTopic(topicAndPartition.topic) + val filteredCounts = consumerAssignmentCounts.toList.filter(consumer => topicConsumers.contains(consumer._1)) + + // Assign partition to consumer with least assignments, tiebreaker is consumer id + val consumer = filteredCounts.sortBy(count => (count._2, count._1)).head._1 + consumerAssignmentCounts(consumer) += 1 + (consumer, topicAndPartition) + } + fill(aggregate(consumerPartitionPairs), topicsPerConsumer.keySet) + } +} diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index adf0801..b77138b 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -77,6 +77,84 @@ class PartitionAssignorTest extends JUnit3Suite with Logging { PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient) }) } + + def testFairPartitionAssignor() { + val assignor = new FairAssignor + + /** various scenarios with only wildcard consumers */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) + ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkClient) + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true) + }) + } + + def testFairPartitionAssignorStaticSubscriptions() { + val assignor = new FairAssignor + + /** test static subscription scenarios */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + val streamCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, 1) + }).toSeq:_*) + ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkClient) + + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true) + }) + } + + def testFairPartitionAssignorUnbalancedStaticSubscriptions() { + val assignor = new FairAssignor + val minConsumerCount = 5 + + /** test unbalanced static subscription scenarios */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = minConsumerCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = 10 + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, 10) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + // Exclude some topics from some consumers + val topicRange = (1 to topicCount - consumer % minConsumerCount) + val streamCounts = Map(topicRange.map(topic => { + ("topic-" + topic, 3) + }).toSeq:_*) + ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkClient) + + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true) + }) + } } private object PartitionAssignorTest extends Logging { diff --git a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala index 887cee5..fc3ad97 100644 --- a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala @@ -162,6 +162,34 @@ class PartitionAssignorTest extends JUnitSuite { } @Test + def testRangeAssignorMultipleConsumersUnbalancedSubscriptions() { + val topic1 = "topic1" + val topic2 = "topic2" + val topic3 = "topic3" + val topic4 = "topic4" + val topic5 = "topic5" + val consumer1 = "consumer1" + val consumer2 = "consumer2" + val consumer3 = "consumer3" + val consumer4 = "consumer4" + val oddTopicPartitions = 2 + val evenTopicPartitions = 1 + val assignor = new RangeAssignor() + val oddTopics = Set(topic1, topic3, topic5) + val allTopics = Set(topic1, topic2, topic3, topic4, topic5) + val topicsPerConsumer = Map(consumer1 -> allTopics, consumer2 -> oddTopics, consumer3 -> oddTopics, consumer4 -> allTopics) + val partitionsPerTopic = Map(topic1 -> oddTopicPartitions, topic2 -> evenTopicPartitions, topic3 -> oddTopicPartitions, + topic4 -> evenTopicPartitions, topic5 -> oddTopicPartitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map( + consumer1 -> topicAndPartitions(Map(topic1 -> Set(0), topic2 -> Set(0), topic3 -> Set(0), topic4 -> Set(0), topic5 -> Set(0))), + consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic3 -> Set(1), topic5 -> Set(1))), + consumer3 -> Set.empty[TopicAndPartition], + consumer4 -> Set.empty[TopicAndPartition]) + assertEquals(expected, actual) + } + + @Test def testRoundRobinAssignorOneConsumerNoTopic() { val consumer = "consumer" val assignor = new RoundRobinAssignor() @@ -297,6 +325,198 @@ class PartitionAssignorTest extends JUnitSuite { assertEquals(expected, actual) } + @Test + def testRoundRobinAssignorMultipleConsumersUnbalancedSubscriptions() { + val topic1 = "topic1" + val topic2 = "topic2" + val topic3 = "topic3" + val topic4 = "topic4" + val topic5 = "topic5" + val consumer1 = "consumer1" + val consumer2 = "consumer2" + val consumer3 = "consumer3" + val consumer4 = "consumer4" + val oddTopicPartitions = 2 + val evenTopicPartitions = 1 + val assignor = new RoundRobinAssignor() + val oddTopics = Set(topic1, topic3, topic5) + val allTopics = Set(topic1, topic2, topic3, topic4, topic5) + val topicsPerConsumer = Map(consumer1 -> allTopics, consumer2 -> oddTopics, consumer3 -> oddTopics, consumer4 -> allTopics) + val partitionsPerTopic = Map(topic1 -> oddTopicPartitions, topic2 -> evenTopicPartitions, topic3 -> oddTopicPartitions, + topic4 -> evenTopicPartitions, topic5 -> oddTopicPartitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map( + consumer1 -> topicAndPartitions(Map(topic1 -> Set(0), topic3 -> Set(0), topic5 -> Set(0))), + consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic3 -> Set(1), topic5 -> Set(1))), + consumer3 -> Set.empty[TopicAndPartition], + consumer4 -> topicAndPartitions(Map(topic2 -> Set(0), topic4 -> Set(0)))) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorOneConsumerNoTopic() { + val consumer = "consumer" + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer -> Set.empty[String]) + val partitionsPerTopic = Map.empty[String, Int] + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map(consumer -> Set.empty[TopicAndPartition]) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorOneConsumerNonexistentTopic() { + val topic = "topic" + val consumer = "consumer" + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer -> Set(topic)) + val partitionsPerTopic = Map(topic -> 0) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map(consumer -> Set.empty[TopicAndPartition]) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorOneConsumerOneTopic() { + val topic = "topic" + val consumer = "consumer" + val numPartitions = 3 + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer -> Set(topic)) + val partitionsPerTopic = Map(topic -> numPartitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map(consumer -> topicAndPartitions(Map(topic -> Set(0, 1, 2)))) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorOnlyAssignsPartitionsFromSubscribedTopics() { + val subscribedTopic = "topic" + val otherTopic = "other" + val consumer = "consumer" + val subscribedTopicNumPartitions = 3 + val otherTopicNumPartitions = 3 + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer -> Set(subscribedTopic)) + val partitionsPerTopic = Map(subscribedTopic -> subscribedTopicNumPartitions, otherTopic -> otherTopicNumPartitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map(consumer -> topicAndPartitions(Map(subscribedTopic -> Set(0, 1, 2)))) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorOneConsumerMultipleTopics() { + val topic1 = "topic1" + val topic2 = "topic2" + val consumer = "consumer" + val numTopic1Partitions = 1 + val numTopic2Partitions = 2 + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer -> Set(topic1, topic2)) + val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map(consumer -> topicAndPartitions(Map(topic1 -> Set(0), topic2 -> Set(0, 1)))) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorTwoConsumersOneTopicOnePartition() { + val topic = "topic" + val consumer1 = "consumer1" + val consumer2 = "consumer2" + val numPartitions = 1 + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer1 -> Set(topic), consumer2 -> Set(topic)) + val partitionsPerTopic = Map(topic -> numPartitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map( + consumer1 -> topicAndPartitions(Map(topic -> Set(0))), + consumer2 -> Set.empty[TopicAndPartition]) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorTwoConsumersOneTopicTwoPartitions() { + val topic = "topic" + val consumer1 = "consumer1" + val consumer2 = "consumer2" + val numPartitions = 2 + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer1 -> Set(topic), consumer2 -> Set(topic)) + val partitionsPerTopic = Map(topic -> numPartitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map( + consumer1 -> topicAndPartitions(Map(topic -> Set(0))), + consumer2 -> topicAndPartitions(Map(topic -> Set(1)))) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorMultipleConsumersMixedTopics() { + val topic1 = "topic1" + val topic2 = "topic2" + val consumer1 = "consumer1" + val consumer2 = "consumer2" + val consumer3 = "consumer3" + val numTopic1Partitions = 3 + val numTopic2Partitions = 2 + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer1 -> Set(topic1), consumer2 -> Set(topic1, topic2), consumer3 -> Set(topic1)) + val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map( + consumer1 -> topicAndPartitions(Map(topic1 -> Set(0, 2))), + consumer2 -> topicAndPartitions(Map(topic2 -> Set(0, 1))), + consumer3 -> topicAndPartitions(Map(topic1 -> Set(1)))) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorTwoConsumersTwoTopicsSixPartitions() { + val topic1 = "topic1" + val topic2 = "topic2" + val consumer1 = "consumer1" + val consumer2 = "consumer2" + val numTopic1Partitions = 3 + val numTopic2Partitions = 3 + val assignor = new FairAssignor() + val topicsPerConsumer = Map(consumer1 -> Set(topic1, topic2), consumer2 -> Set(topic1, topic2)) + val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map( + consumer1 -> topicAndPartitions(Map(topic1 -> Set(0, 2), topic2 -> Set(1))), + consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic2 -> Set(0, 2)))) + assertEquals(expected, actual) + } + + @Test + def testFairAssignorMultipleConsumersUnbalancedSubscriptions() { + val topic1 = "topic1" + val topic2 = "topic2" + val topic3 = "topic3" + val topic4 = "topic4" + val topic5 = "topic5" + val consumer1 = "consumer1" + val consumer2 = "consumer2" + val consumer3 = "consumer3" + val consumer4 = "consumer4" + val oddTopicPartitions = 2 + val evenTopicPartitions = 1 + val assignor = new FairAssignor() + val oddTopics = Set(topic1, topic3, topic5) + val allTopics = Set(topic1, topic2, topic3, topic4, topic5) + val topicsPerConsumer = Map(consumer1 -> allTopics, consumer2 -> oddTopics, consumer3 -> oddTopics, consumer4 -> allTopics) + val partitionsPerTopic = Map(topic1 -> oddTopicPartitions, topic2 -> evenTopicPartitions, topic3 -> oddTopicPartitions, + topic4 -> evenTopicPartitions, topic5 -> oddTopicPartitions) + val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) + val expected = Map( + consumer1 -> topicAndPartitions(Map(topic2 -> Set(0), topic3 -> Set(0))), + consumer2 -> topicAndPartitions(Map(topic1 -> Set(0), topic3 -> Set(1))), + consumer3 -> topicAndPartitions(Map(topic1 -> Set(1), topic5 -> Set(0))), + consumer4 -> topicAndPartitions(Map(topic4 -> Set(0), topic5 -> Set(1)))) + assertEquals(expected, actual) + } + private def topicAndPartitions(topicPartitions: Map[String, Set[Int]]): Set[TopicAndPartition] = { topicPartitions.flatMap { case (topic, partitions) => partitions.map(partition => TopicAndPartition(topic, partition)) -- 2.3.5