From 30eb5163bb9d97ab2d2d0a08497d40c70a462a41 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Wed, 20 Aug 2014 18:09:13 -0700 Subject: [PATCH] v4 --- .../main/scala/kafka/consumer/ConsumerConfig.scala | 4 + .../scala/kafka/consumer/PartitionAssignor.scala | 187 +++++++++++++ .../src/main/scala/kafka/consumer/TopicCount.scala | 39 +-- .../consumer/ZookeeperConsumerConnector.scala | 95 +++---- .../scala/kafka/metrics/KafkaMetricsGroup.scala | 2 + core/src/main/scala/kafka/utils/ZkUtils.scala | 6 +- .../kafka/consumer/PartitionAssignorTest.scala | 299 +++++++++++++++++++++ 7 files changed, 559 insertions(+), 73 deletions(-) create mode 100644 core/src/main/scala/kafka/consumer/PartitionAssignor.scala create mode 100644 core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 1cf2f62..9715317 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -49,6 +49,7 @@ object ConsumerConfig extends Config { val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" val ExcludeInternalTopics = true + val PartitionAssignmentStrategy = "range" /* select between "range","roundrobin" and "symmetric" */ val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" @@ -175,6 +176,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics) + /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ + val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", PartitionAssignmentStrategy) + validate(this) } diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala new file mode 100644 index 0000000..fb58652 --- /dev/null +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import org.I0Itec.zkclient.ZkClient +import kafka.common.TopicAndPartition +import kafka.utils.{Utils, ZkUtils, Logging} + +trait PartitionAssignor { + + /** + * Assigns partitions to all consumer instances in a group. + * @return The assignment of partition to consumer instance and thread. + */ + def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId] + +} + +object PartitionAssignor { + def createInstance(assignmentStrategy: String, group: String, consumerId: String, excludeInternalTopics: Boolean, + zkClient: ZkClient) = assignmentStrategy match { + case "roundrobin" => new RoundRobinAssignor(group, consumerId, excludeInternalTopics, zkClient) + case _ => new RangeAssignor(group, consumerId, excludeInternalTopics, zkClient) + } +} + +class AssignmentContext(val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]], + val partitionsForTopic: collection.Map[String, Seq[Int]], + val consumersForTopic: collection.Map[String, List[ConsumerThreadId]], + val consumers: collection.Seq[String]) { + def this(group: String, consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) = { + this( + { /* myTopicThreadIds */ + val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics) + myTopicCount.getConsumerThreadIdsPerTopic + }, + { /* partitionsForTopic */ + val myTopicThreadIds = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics).getConsumerThreadIdsPerTopic + val partitionsAssignmentPerTopic = ZkUtils.getPartitionAssignmentForTopics(zkClient, myTopicThreadIds.keySet.toSeq) + partitionsAssignmentPerTopic.map { case (topic, replicaAssignment) => + (topic, replicaAssignment.keySet.toSeq.sorted) + }.toMap + }, + /* consumersForTopic */ + ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics).toMap, + /* consumers */ + ZkUtils.getConsumersInGroup(zkClient, group).sorted) + } +} + +/** + * The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It + * then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer + * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts + * will be within a delta of exactly one across all consumer threads.) + * + * (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance + * and thread-id within that instance. Therefore, round-robin assignment is allowed only if: + * a) Every topic-partition has the same number of streams within a consumer instance + * b) The set of subscribed topics is identical for every consumer instance within the group. + * + * @param group Consumer group + * @param consumerId Consumer id of the consumer in the group for which we are doing the assignment + * @param excludeInternalTopics Whether or not to include internal topics + * @param zkClient zkclient + */ + +class RoundRobinAssignor(group: String, consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) + extends PartitionAssignor with Logging { + + def assign(ctx: AssignmentContext) = assign(ctx, Set(consumerId)) + + def assign(ctx: AssignmentContext, forConsumers: Set[String]) = { + val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + + // check conditions (a) and (b) + val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) + ctx.consumersForTopic.foreach { case (topic, threadIds) => + val threadIdSet = threadIds.toSet + require(threadIdSet == headThreadIdSet, + "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + + "AND if the stream counts across topics are identical for a given consumer instance.\n" + + "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + + "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) + } + + val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted) + + val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => + info("Consumer %s using round-robin assignment to rebalance the following partitions: %s for topic %s with consumers: %s" + .format(consumerId, partitions, topic, ctx.consumers)) + partitions.map(partition => { + TopicAndPartition(topic, partition) + }) + }.toSeq.sortWith((topicPartition1, topicPartition2) => { + /** + * Sample ordering: topic0-0, topic1-0, ..., topic0-1, topic1-1, ... + * This helps reduce the likelihood of all partitions of a given topic ending up on one consumer (if it has a high + * enough stream count). + */ + if (topicPartition1.partition == topicPartition2.partition) + topicPartition1.topic.compareTo(topicPartition2.topic) < 0 + else + topicPartition1.partition < topicPartition2.partition + }) + + allTopicPartitions.foreach(topicPartition => { + val threadId = threadAssignor.next() + if (forConsumers.contains(threadId.consumer)) + partitionOwnershipDecision += (topicPartition -> threadId) + }) + + partitionOwnershipDecision + } +} + +/** + * Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order + * and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of + * consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly + * divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 + * and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread + * will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: + * p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1 + * + * @param group Consumer group + * @param consumerId Consumer id of the consumer in the group for which we are doing the assignment + * @param excludeInternalTopics Whether or not to include internal topics + * @param zkClient zkclient + */ +class RangeAssignor(group: String, consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) + extends PartitionAssignor with Logging { + + def assign(ctx: AssignmentContext) = { + val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + + for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { + val curConsumers = ctx.consumersForTopic(topic) + val curPartitions: Seq[Int] = ctx.partitionsForTopic.get(topic).get + + val nPartsPerConsumer = curPartitions.size / curConsumers.size + val nConsumersWithExtraPart = curPartitions.size % curConsumers.size + + info("Consumer " + consumerId + " rebalancing the following partitions: " + curPartitions + + " for topic " + topic + " with consumers: " + curConsumers) + + for (consumerThreadId <- consumerThreadIdSet) { + val myConsumerPosition = curConsumers.indexOf(consumerThreadId) + assert(myConsumerPosition >= 0) + val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) + val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) + + /** + * Range-partition the sorted partitions to consumers for better locality. + * The first few consumers pick up an extra partition, if any. + */ + if (nParts <= 0) + warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) + else { + for (i <- startPart until startPart + nParts) { + val partition = curPartitions(i) + info(consumerThreadId + " attempting to claim partition " + partition) + // record the partition ownership decision + partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) + } + } + } + } + + partitionOwnershipDecision + } +} diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index 8b0ae57..0954b3c 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -24,28 +24,37 @@ import kafka.common.KafkaException private[kafka] trait TopicCount { - def getConsumerThreadIdsPerTopic: Map[String, Set[String]] + def getConsumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] def getTopicCountMap: Map[String, Int] def pattern: String - - protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, - topicCountMap: Map[String, Int]) = { - val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]() + +} + +case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] { + override def toString = "%s-%d".format(consumer, threadId) + + def compare(that: ConsumerThreadId) = toString.compare(that.toString) +} + +private[kafka] object TopicCount extends Logging { + val whiteListPattern = "white_list" + val blackListPattern = "black_list" + val staticPattern = "static" + + def makeThreadId(consumerIdString: String, threadId: Int) = consumerIdString + "-" + threadId + + def makeConsumerThreadIdsPerTopic(consumerIdString: String, + topicCountMap: Map[String, Int]) = { + val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]() for ((topic, nConsumers) <- topicCountMap) { - val consumerSet = new mutable.HashSet[String] + val consumerSet = new mutable.HashSet[ConsumerThreadId] assert(nConsumers >= 1) for (i <- 0 until nConsumers) - consumerSet += consumerIdString + "-" + i + consumerSet += ConsumerThreadId(consumerIdString, i) consumerThreadIdsPerTopicMap.put(topic, consumerSet) } consumerThreadIdsPerTopicMap } -} - -private[kafka] object TopicCount extends Logging { - val whiteListPattern = "white_list" - val blackListPattern = "black_list" - val staticPattern = "static" def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = { val dirs = new ZKGroupDirs(group) @@ -101,7 +110,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) extends TopicCount { - def getConsumerThreadIdsPerTopic = makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap) + def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap) override def equals(obj: Any): Boolean = { obj match { @@ -124,7 +133,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, def getConsumerThreadIdsPerTopic = { val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath) .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics)) - makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) + TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index acfd064..3fd9634 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -90,7 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] private val checkpointedOffsets = new Pool[TopicAndPartition, Long] - private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]] + private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-") private val messageStreamCreated = new AtomicBoolean(false) @@ -514,9 +514,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { + + private val partitionAssignor = PartitionAssignor.createInstance( + config.partitionAssignmentStrategy, group, consumerIdString, config.excludeInternalTopics, zkClient) + private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() + + @volatile private var allTopicsOwnedPartitionsCount = 0 + newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] { + def value() = allTopicsOwnedPartitionsCount + }) + + private def ownedPartitionsCountMetricName(topic: String) = + "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic) + private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { override def run() { info("starting watcher executor thread for consumer " + consumerIdString) @@ -565,10 +578,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= { info("Releasing partition ownership") for ((topic, infos) <- localTopicRegistry) { - for(partition <- infos.keys) + for(partition <- infos.keys) { deletePartitionOwnershipFromZK(topic, partition) + } + removeMetric(ownedPartitionsCountMetricName(topic)) localTopicRegistry.remove(topic) } + allTopicsOwnedPartitionsCount = 0 } def resetState() { @@ -618,7 +634,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def rebalance(cluster: Cluster): Boolean = { val myTopicThreadIdsMap = TopicCount.constructTopicCount( group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic - val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics) val brokers = getAllBrokersInCluster(zkClient) if (brokers.size == 0) { // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. @@ -629,9 +644,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, true } else { - val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq) - val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq.sorted)) - /** * fetchers must be stopped to avoid data duplication, since if the current * rebalancing attempt fails, the partitions that are released could be owned by another consumer. @@ -642,67 +654,40 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, releasePartitionOwnership(topicRegistry) - var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]() - val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] - - for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { - currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) - - val curConsumers = consumersPerTopicMap.get(topic).get - val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get - - val nPartsPerConsumer = curPartitions.size / curConsumers.size - val nConsumersWithExtraPart = curPartitions.size % curConsumers.size - - info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + - " for topic " + topic + " with consumers: " + curConsumers) - - for (consumerThreadId <- consumerThreadIdSet) { - val myConsumerPosition = curConsumers.indexOf(consumerThreadId) - assert(myConsumerPosition >= 0) - val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) - val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) - - /** - * Range-partition the sorted partitions to consumers for better locality. - * The first few consumers pick up an extra partition, if any. - */ - if (nParts <= 0) - warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) - else { - for (i <- startPart until startPart + nParts) { - val partition = curPartitions(i) - info(consumerThreadId + " attempting to claim partition " + partition) - // record the partition ownership decision - partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) - } - } - } - } + val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) + val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) + val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( + valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) // fetch current offsets for all topic-partitions val topicPartitions = partitionOwnershipDecision.keySet.toSeq + val offsetFetchResponseOpt = fetchOffsets(topicPartitions) if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) false else { val offsetFetchResponse = offsetFetchResponseOpt.get - topicPartitions.foreach { topicAndPartition => + topicPartitions.foreach(topicAndPartition => { val (topic, partition) = topicAndPartition.asTuple val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset val threadId = partitionOwnershipDecision(topicAndPartition) addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) - } + }) /** * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt * A rebalancing attempt is completed successfully only after the fetchers have been started correctly */ - if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) { - info("Updating the cache") - debug("Partitions per topic cache " + partitionsPerTopicMap) - debug("Consumers per topic cache " + consumersPerTopicMap) + if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { + allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size + + partitionOwnershipDecision.view.groupBy(_._1.topic).foreach { case (topic, partitionThreadPairs) => + newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] { + def value() = partitionThreadPairs.size + }) + } + topicRegistry = currentTopicRegistry updateFetcher(cluster) true @@ -753,7 +738,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]], - relevantTopicThreadIdsMap: Map[String, Set[String]]) { + relevantTopicThreadIdsMap: Map[String, Set[ConsumerThreadId]]) { // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer // after this rebalancing attempt val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2) @@ -776,7 +761,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, String]): Boolean = { + private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, ConsumerThreadId]): Boolean = { var successfullyOwnedPartitions : List[(String, Int)] = Nil val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => val topic = partitionOwner._1.topic @@ -784,7 +769,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val consumerThreadId = partitionOwner._2 val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition) try { - createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId) + createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId.toString) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) successfullyOwnedPartitions ::= (topic, partition) true @@ -808,8 +793,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], partition: Int, topic: String, - offset: Long, consumerThreadId: String) { - val partTopicInfoMap = currentTopicRegistry.get(topic) + offset: Long, consumerThreadId: ConsumerThreadId) { + val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic) val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) val consumedOffset = new AtomicLong(offset) @@ -852,7 +837,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} - val consumerThreadIdsPerTopic: Map[String, Set[String]] = + val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] = topicCount.getConsumerThreadIdsPerTopic val allQueuesAndStreams = topicCount match { diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index 00df462..2313a57 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -72,6 +72,8 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"), // kafka.consumer.ConsumerFetcherManager new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"), diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dcdc1ce..a7b1fdc 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -18,7 +18,7 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} -import kafka.consumer.TopicCount +import kafka.consumer.{ConsumerThreadId, TopicCount} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} @@ -658,10 +658,10 @@ object ZkUtils extends Logging { getChildren(zkClient, dirs.consumerRegistryDir) } - def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[String]] = { + def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = { val dirs = new ZKGroupDirs(group) val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir) - val consumersPerTopicMap = new mutable.HashMap[String, List[String]] + val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]] for (consumer <- consumers) { val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient, excludeInternalTopics) for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) { diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala new file mode 100644 index 0000000..7f34c78 --- /dev/null +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.consumer + +import org.scalatest.junit.JUnit3Suite +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.data.Stat +import kafka.consumer.{AssignmentContext, ConsumerThreadId, RangeAssignor, RoundRobinAssignor} +import kafka.utils.{TestUtils, Logging, ZkUtils, Json} +import unit.kafka.consumer.PartitionAssignorTest.{StaticSubscriptionInfo, Scenario, WildcardSubscriptionInfo} +import junit.framework.Assert._ +import kafka.common.TopicAndPartition + +class PartitionAssignorTest extends JUnit3Suite with Logging { + + def testRoundRobinPartitionAssignor() { + /** + * Fully check (by hand) two asymmetric scenarios - one with only wildcard subscriptions and one with a mix of + * wildcard and static subscriptions. + * + * Automatically test a range of symmetric and asymmetric scenarios - check for coverage and uniqueness. + */ + + /** only wildcard scenario: c1 with two streams, c2 with three streams */ + val wildcardScenario = Scenario(group = "g1", + topicPartitionCounts = Map("x" -> 8, "y" -> 4, "z" -> 6), + subscriptions = Map("g1c1" -> WildcardSubscriptionInfo(streamCount = 2, + regex = ".*", + isWhitelist = true), + "g1c2" -> WildcardSubscriptionInfo(streamCount = 3, + regex = ".*", + isWhitelist = true))) + val allConsumerIds: Set[String] = Set("g1c1", "g1c2") + val wildcardScenarioZkClient = PartitionAssignorTest.setupZkClientMock(wildcardScenario) + EasyMock.replay(wildcardScenarioZkClient) + + val wildcardScenarioAssignor = new RoundRobinAssignor("g1", "g1c1", excludeInternalTopics = true, wildcardScenarioZkClient) + val wildcardScenarioAssignmentContext = new AssignmentContext("g1", "g1c1", excludeInternalTopics = true, wildcardScenarioZkClient) + val wildcardScenarioAssignment = wildcardScenarioAssignor.assign(wildcardScenarioAssignmentContext, allConsumerIds) + + PartitionAssignorTest.checkAssignmentCompleteAndUnique(wildcardScenario, wildcardScenarioAssignment) + + val expectedWildcardScenarioAssignment = Map( + TopicAndPartition("x", 0) -> ConsumerThreadId("g1c1", 0), TopicAndPartition("y", 0) -> ConsumerThreadId("g1c1", 1), + TopicAndPartition("z", 0) -> ConsumerThreadId("g1c2", 0), TopicAndPartition("x", 1) -> ConsumerThreadId("g1c2", 1), + TopicAndPartition("y", 1) -> ConsumerThreadId("g1c2", 2), TopicAndPartition("z", 1) -> ConsumerThreadId("g1c1", 0), + TopicAndPartition("x", 2) -> ConsumerThreadId("g1c1", 1), TopicAndPartition("y", 2) -> ConsumerThreadId("g1c2", 0), + TopicAndPartition("z", 2) -> ConsumerThreadId("g1c2", 1), TopicAndPartition("x", 3) -> ConsumerThreadId("g1c2", 2), + TopicAndPartition("y", 3) -> ConsumerThreadId("g1c1", 0), TopicAndPartition("z", 3) -> ConsumerThreadId("g1c1", 1), + TopicAndPartition("x", 4) -> ConsumerThreadId("g1c2", 0), TopicAndPartition("z", 4) -> ConsumerThreadId("g1c2", 1), + TopicAndPartition("x", 5) -> ConsumerThreadId("g1c2", 2), TopicAndPartition("z", 5) -> ConsumerThreadId("g1c1", 0), + TopicAndPartition("x", 6) -> ConsumerThreadId("g1c1", 1), TopicAndPartition("x", 7) -> ConsumerThreadId("g1c2", 0) + ) + assertTrue("Scenario %s: incorrect assignment\n".format(wildcardScenario), + expectedWildcardScenarioAssignment == wildcardScenarioAssignment) + + /** various scenarios with only wildcard consumers */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 2.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val allConsumerIds = (1 to consumerCount).map("g1c" + _).toSet + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) + val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) + }).toSeq:_*) + + // to check that another random consumer's computed global assignment is identical to C1's computed global assignment + var cx = 1 + while (cx == 1) cx = 1.max(TestUtils.random.nextInt(consumerCount + 1)) + + val symmetricSubscriptions = Map((1 to consumerCount).map(consumer => { + ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) + }).toSeq:_*) + val symmetricScenario = Scenario("g1", topicPartitionCounts, symmetricSubscriptions) + val symmetricZkClient = PartitionAssignorTest.setupZkClientMock(symmetricScenario) + EasyMock.replay(symmetricZkClient) + val symmetricAssignmentContext = new AssignmentContext("g1", "g1c1", excludeInternalTopics = true, symmetricZkClient) + val c1SymmetricAssignment = new RoundRobinAssignor("g1", "g1c1", excludeInternalTopics = true, symmetricZkClient) + .assign(symmetricAssignmentContext, allConsumerIds) + PartitionAssignorTest.checkAssignmentCompleteAndUnique(symmetricScenario, c1SymmetricAssignment) + PartitionAssignorTest.checkAssignmentIsUniform(symmetricScenario, streamCount, c1SymmetricAssignment) + + val cxSymmetricAssignmentContext = new AssignmentContext("g1", "g1c" + cx, excludeInternalTopics = true, symmetricZkClient) + val cxSymmetricAssignment = new RoundRobinAssignor("g1", "g1c" + cx, excludeInternalTopics = true, symmetricZkClient) + .assign(cxSymmetricAssignmentContext, allConsumerIds) + assertTrue("Scenario %s: inconsistent assignments between consumer 1 and %d.".format(symmetricScenario, cx), + cxSymmetricAssignment == c1SymmetricAssignment) + + val asymmetricSubscriptions = Map((1 to consumerCount).map(consumer => { + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) + ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) + }).toSeq:_*) + val asymmetricScenario = Scenario("g1", topicPartitionCounts, asymmetricSubscriptions) + val asymmetricZkClient = PartitionAssignorTest.setupZkClientMock(asymmetricScenario) + EasyMock.replay(asymmetricZkClient) + val asymmetricAssignmentContext = new AssignmentContext("g1", "g1c1", excludeInternalTopics = true, asymmetricZkClient) + val asymmetricAssignment = new RoundRobinAssignor("g1", "g1c1", excludeInternalTopics = true, asymmetricZkClient) + .assign(asymmetricAssignmentContext, allConsumerIds) + PartitionAssignorTest.checkAssignmentCompleteAndUnique(asymmetricScenario, asymmetricAssignment) + + val cxAsymmetricAssignmentContext = new AssignmentContext("g1", "g1c1", excludeInternalTopics = true, asymmetricZkClient) + val cxAsymmetricAssignment = new RoundRobinAssignor("g1", "g1c" + cx, excludeInternalTopics = true, asymmetricZkClient) + .assign(cxAsymmetricAssignmentContext, allConsumerIds) + assertTrue("Scenario %s: inconsistent assignments between consumer 1 and %d.".format(asymmetricScenario, cx), + cxAsymmetricAssignment == asymmetricAssignment) + + }) + } + + def testRangePartitionAssignor() { + /** various scenarios with only wildcard consumers */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 2.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) + }).toSeq:_*) + + // to check that another random consumer's computed global assignment is identical to C1's computed global assignment + var cx = 1 + while (cx == 1) cx = 1.max(TestUtils.random.nextInt(consumerCount + 1)) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + val streamCounts = Map((1 to topicCount).map(topic => { + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) + ("topic-" + topic, streamCount) + }).toSeq:_*) + ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkClient) + + val assignments = (1 to consumerCount).map(consumer => { + val ctx = new AssignmentContext("g1", "g1c" + consumer, excludeInternalTopics = true, zkClient) + new RangeAssignor("g1", "g1c" + consumer, excludeInternalTopics = true, zkClient).assign(ctx) + }) + + val globalAssignment = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + assignments.foreach(assignment => { + assignment.foreach { case(topicPartition, owner) => + val previousOwnerOpt = globalAssignment.put(topicPartition, owner) + assertTrue("Scenario %s: %s is assigned to two owners.".format(scenario, topicPartition), previousOwnerOpt.isEmpty) + } + }) + + PartitionAssignorTest.checkAssignmentCompleteAndUnique(scenario, globalAssignment) + }) + } + +} + +private object PartitionAssignorTest extends Logging { + + private val TestCaseCount = 3 + private val MaxConsumerCount = 10 + private val MaxStreamCount = 8 + private val MaxTopicCount = 100 + private val MinTopicCount = 20 + private val MaxPartitionCount = 120 + private val MinPartitionCount = 8 + + private trait SubscriptionInfo { + def registrationString: String + } + + private case class StaticSubscriptionInfo(streamCounts: Map[String, Int]) extends SubscriptionInfo { + def registrationString = + Json.encode(Map("version" -> 1, + "subscription" -> streamCounts, + "pattern" -> "static", + "timestamp" -> 1234.toString)) + + override def toString = { + "Stream counts: " + streamCounts + } + } + + private case class WildcardSubscriptionInfo(streamCount: Int, regex: String, isWhitelist: Boolean) + extends SubscriptionInfo { + def registrationString = + Json.encode(Map("version" -> 1, + "subscription" -> Map(regex -> streamCount), + "pattern" -> (if (isWhitelist) "white_list" else "black_list"))) + + override def toString = { + "\"%s\":%d (%s)".format(regex, streamCount, if (isWhitelist) "whitelist" else "blacklist") + } + } + + private case class Scenario(group: String, + topicPartitionCounts: Map[String, Int], + /* consumerId -> SubscriptionInfo */ + subscriptions: Map[String, SubscriptionInfo]) { + override def toString = { + "\n" + + "Group : %s\n".format(group) + + "Topic partition counts : %s\n".format(topicPartitionCounts) + + "Consumer subscriptions : %s\n".format(subscriptions) + } + } + + private def setupZkClientMock(scenario: Scenario) = { + val consumers = java.util.Arrays.asList(scenario.subscriptions.keys.toSeq:_*) + + val zkClient = EasyMock.createStrictMock(classOf[ZkClient]) + EasyMock.checkOrder(zkClient, false) + + EasyMock.expect(zkClient.getChildren("/consumers/%s/ids".format(scenario.group))).andReturn(consumers) + EasyMock.expectLastCall().anyTimes() + + scenario.subscriptions.foreach { case(consumerId, subscriptionInfo) => + EasyMock.expect(zkClient.readData("/consumers/%s/ids/%s".format(scenario.group, consumerId), new Stat())) + .andReturn(subscriptionInfo.registrationString) + EasyMock.expectLastCall().anyTimes() + } + + scenario.topicPartitionCounts.foreach { case(topic, partitionCount) => + val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*) + EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat())) + .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment)) + EasyMock.expectLastCall().anyTimes() + } + + EasyMock.expect(zkClient.getChildren("/brokers/topics")).andReturn( + java.util.Arrays.asList(scenario.topicPartitionCounts.keys.toSeq:_*)) + EasyMock.expectLastCall().anyTimes() + + zkClient + } + + private def checkAssignmentCompleteAndUnique(scenario: Scenario, assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) { + val assignedPartitions = assignment.keySet + val givenPartitions = scenario.topicPartitionCounts.flatMap{ case (topic, partitionCount) => + (0 until partitionCount).map(partition => TopicAndPartition(topic, partition)) + }.toSet + + assertTrue("Scenario %s: the list of given partitions and assigned partitions are different.".format(scenario), + givenPartitions == assignedPartitions) + + val counts = partitionOwnerCounts(assignment) + counts.foreach { case (topicPartition, count) => + assertTrue("Scenario %s: partition %s is owned by %d (i.e., more than one) consumer streams." + .format(scenario, topicPartition, count), count <= 1) + } + } + + private def checkAssignmentIsUniform(scenario: Scenario, streamCount: Int, assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) { + val expectedMinOwnedCount = + scenario.topicPartitionCounts.valuesIterator.sum / (scenario.subscriptions.size * streamCount) + val expectedMaxOwnedCount = expectedMinOwnedCount + 1 + val validCounts = Seq(expectedMinOwnedCount, expectedMaxOwnedCount) + val actualCounts = PartitionAssignorTest.partitionsOwnedCounts(assignment) + actualCounts.foreach { case(stream, count) => + assertTrue("Scenario %s: consumer stream %s owns %d partitions - expected range is [%d, %d]." + .format(scenario, stream, count, expectedMinOwnedCount, expectedMaxOwnedCount), validCounts.contains(count)) + } + } + + /** For each partition, count the number of consumers that own that partition (should be exactly one). */ + private def partitionOwnerCounts(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = { + val ownerCounts = collection.mutable.Map[TopicAndPartition, Int]() + assignment.foreach { case (topicPartition, owner) => + val updatedCount = ownerCounts.getOrElse(topicPartition, 0) + 1 + ownerCounts.put(topicPartition, updatedCount) + } + ownerCounts + } + + /** For each consumer stream, count the number of partitions that it owns. */ + private def partitionsOwnedCounts(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = { + val ownedCounts = collection.mutable.Map[ConsumerThreadId, Int]() + assignment.foreach { case (topicPartition, owner) => + val updatedCount = ownedCounts.getOrElse(owner, 0) + 1 + ownedCounts.put(owner, updatedCount) + } + ownedCounts + } + +} + -- 1.8.5.2 (Apple Git-48)