From 176fbc6eb7341bbae57bdf86824b4f76c83ba35b Mon Sep 17 00:00:00 2001 From: Andrew Olson Date: Fri, 14 Aug 2015 09:17:29 -0500 Subject: [PATCH] Remove identical topic subscription constraint for roundrobin strategy in old consumer API --- .../scala/kafka/consumer/PartitionAssignor.scala | 27 ++++------- .../consumer/ZookeeperConsumerConnector.scala | 2 +- .../kafka/consumer/PartitionAssignorTest.scala | 55 ++++++++++++++++++++++ 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 849284a..c783e51 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -47,12 +47,13 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo myTopicCount.getConsumerThreadIdsPerTopic } - val partitionsForTopic: collection.Map[String, Seq[Int]] = - ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq) - val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics) + // Some assignment strategies require knowledge of all topics consumed by any member of the group + val partitionsForTopic: collection.Map[String, Seq[Int]] = + ZkUtils.getPartitionsForTopics(zkClient, consumersForTopic.keySet.toSeq) + val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted } @@ -61,13 +62,7 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo * then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts * will be within a delta of exactly one across all consumer threads.) - * - * (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance - * and thread-id within that instance. Therefore, round-robin assignment is allowed only if: - * a) Every topic has the same number of streams within a consumer instance - * b) The set of subscribed topics is identical for every consumer instance within the group. */ - class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { @@ -77,18 +72,11 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) if (ctx.consumersForTopic.size > 0) { - // check conditions (a) and (b) - val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) - ctx.consumersForTopic.foreach { case (topic, threadIds) => - val threadIdSet = threadIds.toSet - require(threadIdSet == headThreadIdSet, - "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + - "AND if the stream counts across topics are identical for a given consumer instance.\n" + - "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + - "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) + val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, threadIds) => + threadIds } - val threadAssignor = CoreUtils.circularIterator(headThreadIdSet.toSeq.sorted) + var threadAssignor = CoreUtils.circularIterator(allThreadIds) info("Starting round-robin assignment with consumers " + ctx.consumers) val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, partitions) => @@ -106,6 +94,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { }) allTopicPartitions.foreach(topicPartition => { + threadAssignor = threadAssignor.dropWhile(threadId => !ctx.consumersForTopic(topicPartition.topic).contains(threadId)) val threadId = threadAssignor.next() // record the partition ownership decision val assignmentForConsumer = partitionAssignment.getAndMaybePut(threadId.consumer) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e42d104..9148180 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -836,7 +836,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } catch { case e: ZkNodeExistsException => // The node hasn't been deleted by the original owner. So wait a bit and retry. - info("waiting for the partition ownership to be deleted: " + partition) + info("waiting for the partition ownership to be deleted: " + partition + " for topic " + topic) false case e2: Throwable => throw e2 } diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index adf0801..013d357 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -53,6 +53,61 @@ class PartitionAssignorTest extends JUnit3Suite with Logging { }) } + def testRoundRobinPartitionAssignorStaticSubscriptions() { + val assignor = new RoundRobinAssignor + + /** test static subscription scenarios */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + val streamCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, 1) + }).toSeq:_*) + ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkClient) + + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true) + }) + } + + def testRoundRobinPartitionAssignorUnbalancedStaticSubscriptions() { + val assignor = new RoundRobinAssignor + val minConsumerCount = 5 + + /** test unbalanced static subscription scenarios */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = minConsumerCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = 10 + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, 10) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + // Exclude some topics from some consumers + val topicRange = (1 to topicCount - consumer % minConsumerCount) + val streamCounts = Map(topicRange.map(topic => { + ("topic-" + topic, 3) + }).toSeq:_*) + ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkClient) + + PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient) + }) + } + def testRangePartitionAssignor() { val assignor = new RangeAssignor (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { -- 2.3.5