From 447581604b0d3175fee0e9460937daa86cd4df86 Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Mon, 25 Aug 2014 12:36:39 -0700 Subject: [PATCH] v5 --- .../kafka/common/requests/MetadataResponse.java | 27 +-- .../main/scala/kafka/consumer/ConsumerConfig.scala | 4 + .../scala/kafka/consumer/PartitionAssignor.scala | 162 ++++++++++++++ .../src/main/scala/kafka/consumer/TopicCount.scala | 39 ++-- .../consumer/ZookeeperConsumerConnector.scala | 94 ++++----- .../scala/kafka/metrics/KafkaMetricsGroup.scala | 2 + core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 6 +- .../kafka/consumer/PartitionAssignorTest.scala | 234 +++++++++++++++++++++ 9 files changed, 484 insertions(+), 86 deletions(-) create mode 100644 core/src/main/scala/kafka/consumer/PartitionAssignor.scala create mode 100644 core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index d97962d..7d90fce 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -117,18 +117,21 @@ public class MetadataResponse extends AbstractRequestResponse { Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME); for (int j = 0; j < partitionInfos.length; j++) { Struct partitionInfo = (Struct) partitionInfos[j]; - int partition = partitionInfo.getInt(PARTITION_KEY_NAME); - int leader = partitionInfo.getInt(LEADER_KEY_NAME); - Node leaderNode = leader == -1 ? null : brokers.get(leader); - Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); - Node[] replicaNodes = new Node[replicas.length]; - for (int k = 0; k < replicas.length; k++) - replicaNodes[k] = brokers.get(replicas[k]); - Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); - Node[] isrNodes = new Node[isr.length]; - for (int k = 0; k < isr.length; k++) - isrNodes[k] = brokers.get(isr[k]); - partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); + short partError = partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME); + if (partError == Errors.NONE.code()) { + int partition = partitionInfo.getInt(PARTITION_KEY_NAME); + int leader = partitionInfo.getInt(LEADER_KEY_NAME); + Node leaderNode = leader == -1 ? null : brokers.get(leader); + Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); + Node[] replicaNodes = new Node[replicas.length]; + for (int k = 0; k < replicas.length; k++) + replicaNodes[k] = brokers.get(replicas[k]); + Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); + Node[] isrNodes = new Node[isr.length]; + for (int k = 0; k < isr.length; k++) + isrNodes[k] = brokers.get(isr[k]); + partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); + } } } else { errors.put(topic, Errors.forCode(topicError)); diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 1cf2f62..99c229c 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -49,6 +49,7 @@ object ConsumerConfig extends Config { val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" val ExcludeInternalTopics = true + val PartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */ val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" @@ -175,6 +176,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics) + /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ + val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", PartitionAssignmentStrategy) + validate(this) } diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala new file mode 100644 index 0000000..203f220 --- /dev/null +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import org.I0Itec.zkclient.ZkClient +import kafka.common.TopicAndPartition +import kafka.utils.{Utils, ZkUtils, Logging} + +trait PartitionAssignor { + + /** + * Assigns partitions to all consumer instances in a group. + * @return The assignment of partition to consumer instance and thread. + */ + def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId] + +} + +object PartitionAssignor { + def createInstance(assignmentStrategy: String) = assignmentStrategy match { + case "roundrobin" => new RoundRobinAssignor() + case _ => new RangeAssignor() + } +} + +class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) { + val myTopicThreadIds = { + val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics) + myTopicCount.getConsumerThreadIdsPerTopic + } + val partitionsForTopic = { + val partitionsAssignmentPerTopic = ZkUtils.getPartitionAssignmentForTopics(zkClient, myTopicThreadIds.keySet.toSeq) + partitionsAssignmentPerTopic.map { case (topic, replicaAssignment) => + (topic, replicaAssignment.keySet.toSeq.sorted) + }.toMap + } + val consumersForTopic = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics).toMap + val consumers = ZkUtils.getConsumersInGroup(zkClient, group).sorted +} + +/** + * The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It + * then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer + * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts + * will be within a delta of exactly one across all consumer threads.) + * + * (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance + * and thread-id within that instance. Therefore, round-robin assignment is allowed only if: + * a) Every topic-partition has the same number of streams within a consumer instance + * b) The set of subscribed topics is identical for every consumer instance within the group. + */ + +class RoundRobinAssignor() extends PartitionAssignor with Logging { + + def assign(ctx: AssignmentContext) = assign(ctx, Set(ctx.consumerId)) + + def assign(ctx: AssignmentContext, forConsumers: Set[String]) = { + val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + + // check conditions (a) and (b) + val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet) + ctx.consumersForTopic.foreach { case (topic, threadIds) => + val threadIdSet = threadIds.toSet + require(threadIdSet == headThreadIdSet, + "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + + "AND if the stream counts across topics are identical for a given consumer instance.\n" + + "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + + "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) + } + + val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted) + + info("Starting round-robin assignment with consumers " + ctx.consumers) + val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => + info("Consumer %s rebalancing the following partitions for topic %s: %s" + .format(ctx.consumerId, topic, partitions)) + partitions.map(partition => { + TopicAndPartition(topic, partition) + }) + }.toSeq.sortWith((topicPartition1, topicPartition2) => { + /* + * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending + * up on one consumer (if it has a high enough stream count). + */ + topicPartition1.toString.hashCode < topicPartition2.toString.hashCode + }) + + allTopicPartitions.foreach(topicPartition => { + val threadId = threadAssignor.next() + if (forConsumers.contains(threadId.consumer)) + partitionOwnershipDecision += (topicPartition -> threadId) + }) + + partitionOwnershipDecision + } +} + +/** + * Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order + * and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of + * consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly + * divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 + * and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread + * will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: + * p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1 + */ +class RangeAssignor() extends PartitionAssignor with Logging { + + def assign(ctx: AssignmentContext) = { + val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + + for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { + val curConsumers = ctx.consumersForTopic(topic) + val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) + + val nPartsPerConsumer = curPartitions.size / curConsumers.size + val nConsumersWithExtraPart = curPartitions.size % curConsumers.size + + info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + + " for topic " + topic + " with consumers: " + curConsumers) + + for (consumerThreadId <- consumerThreadIdSet) { + val myConsumerPosition = curConsumers.indexOf(consumerThreadId) + assert(myConsumerPosition >= 0) + val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) + val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) + + /** + * Range-partition the sorted partitions to consumers for better locality. + * The first few consumers pick up an extra partition, if any. + */ + if (nParts <= 0) + warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) + else { + for (i <- startPart until startPart + nParts) { + val partition = curPartitions(i) + info(consumerThreadId + " attempting to claim partition " + partition) + // record the partition ownership decision + partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) + } + } + } + } + + partitionOwnershipDecision + } +} diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index 8b0ae57..0954b3c 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -24,28 +24,37 @@ import kafka.common.KafkaException private[kafka] trait TopicCount { - def getConsumerThreadIdsPerTopic: Map[String, Set[String]] + def getConsumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] def getTopicCountMap: Map[String, Int] def pattern: String - - protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, - topicCountMap: Map[String, Int]) = { - val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]() + +} + +case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] { + override def toString = "%s-%d".format(consumer, threadId) + + def compare(that: ConsumerThreadId) = toString.compare(that.toString) +} + +private[kafka] object TopicCount extends Logging { + val whiteListPattern = "white_list" + val blackListPattern = "black_list" + val staticPattern = "static" + + def makeThreadId(consumerIdString: String, threadId: Int) = consumerIdString + "-" + threadId + + def makeConsumerThreadIdsPerTopic(consumerIdString: String, + topicCountMap: Map[String, Int]) = { + val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]() for ((topic, nConsumers) <- topicCountMap) { - val consumerSet = new mutable.HashSet[String] + val consumerSet = new mutable.HashSet[ConsumerThreadId] assert(nConsumers >= 1) for (i <- 0 until nConsumers) - consumerSet += consumerIdString + "-" + i + consumerSet += ConsumerThreadId(consumerIdString, i) consumerThreadIdsPerTopicMap.put(topic, consumerSet) } consumerThreadIdsPerTopicMap } -} - -private[kafka] object TopicCount extends Logging { - val whiteListPattern = "white_list" - val blackListPattern = "black_list" - val staticPattern = "static" def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = { val dirs = new ZKGroupDirs(group) @@ -101,7 +110,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) extends TopicCount { - def getConsumerThreadIdsPerTopic = makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap) + def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap) override def equals(obj: Any): Boolean = { obj match { @@ -124,7 +133,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, def getConsumerThreadIdsPerTopic = { val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath) .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics)) - makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) + TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index acfd064..5a2eb6e 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -90,7 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] private val checkpointedOffsets = new Pool[TopicAndPartition, Long] - private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]] + private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-") private val messageStreamCreated = new AtomicBoolean(false) @@ -514,9 +514,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { + + private val partitionAssignor = PartitionAssignor.createInstance(config.partitionAssignmentStrategy) + private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() + + @volatile private var allTopicsOwnedPartitionsCount = 0 + newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] { + def value() = allTopicsOwnedPartitionsCount + }) + + private def ownedPartitionsCountMetricName(topic: String) = + "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic) + private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { override def run() { info("starting watcher executor thread for consumer " + consumerIdString) @@ -565,10 +577,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= { info("Releasing partition ownership") for ((topic, infos) <- localTopicRegistry) { - for(partition <- infos.keys) + for(partition <- infos.keys) { deletePartitionOwnershipFromZK(topic, partition) + } + removeMetric(ownedPartitionsCountMetricName(topic)) localTopicRegistry.remove(topic) } + allTopicsOwnedPartitionsCount = 0 } def resetState() { @@ -618,7 +633,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def rebalance(cluster: Cluster): Boolean = { val myTopicThreadIdsMap = TopicCount.constructTopicCount( group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic - val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics) val brokers = getAllBrokersInCluster(zkClient) if (brokers.size == 0) { // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. @@ -629,9 +643,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, true } else { - val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq) - val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq.sorted)) - /** * fetchers must be stopped to avoid data duplication, since if the current * rebalancing attempt fails, the partitions that are released could be owned by another consumer. @@ -642,67 +653,40 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, releasePartitionOwnership(topicRegistry) - var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]() - val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] - - for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { - currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) - - val curConsumers = consumersPerTopicMap.get(topic).get - val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get - - val nPartsPerConsumer = curPartitions.size / curConsumers.size - val nConsumersWithExtraPart = curPartitions.size % curConsumers.size - - info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + - " for topic " + topic + " with consumers: " + curConsumers) - - for (consumerThreadId <- consumerThreadIdSet) { - val myConsumerPosition = curConsumers.indexOf(consumerThreadId) - assert(myConsumerPosition >= 0) - val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) - val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) - - /** - * Range-partition the sorted partitions to consumers for better locality. - * The first few consumers pick up an extra partition, if any. - */ - if (nParts <= 0) - warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) - else { - for (i <- startPart until startPart + nParts) { - val partition = curPartitions(i) - info(consumerThreadId + " attempting to claim partition " + partition) - // record the partition ownership decision - partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) - } - } - } - } + val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) + val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) + val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( + valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) // fetch current offsets for all topic-partitions val topicPartitions = partitionOwnershipDecision.keySet.toSeq + val offsetFetchResponseOpt = fetchOffsets(topicPartitions) if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) false else { val offsetFetchResponse = offsetFetchResponseOpt.get - topicPartitions.foreach { topicAndPartition => + topicPartitions.foreach(topicAndPartition => { val (topic, partition) = topicAndPartition.asTuple val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset val threadId = partitionOwnershipDecision(topicAndPartition) addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) - } + }) /** * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt * A rebalancing attempt is completed successfully only after the fetchers have been started correctly */ - if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) { - info("Updating the cache") - debug("Partitions per topic cache " + partitionsPerTopicMap) - debug("Consumers per topic cache " + consumersPerTopicMap) + if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { + allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size + + partitionOwnershipDecision.view.groupBy(_._1.topic).foreach { case (topic, partitionThreadPairs) => + newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] { + def value() = partitionThreadPairs.size + }) + } + topicRegistry = currentTopicRegistry updateFetcher(cluster) true @@ -753,7 +737,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]], - relevantTopicThreadIdsMap: Map[String, Set[String]]) { + relevantTopicThreadIdsMap: Map[String, Set[ConsumerThreadId]]) { // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer // after this rebalancing attempt val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2) @@ -776,7 +760,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, String]): Boolean = { + private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, ConsumerThreadId]): Boolean = { var successfullyOwnedPartitions : List[(String, Int)] = Nil val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => val topic = partitionOwner._1.topic @@ -784,7 +768,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val consumerThreadId = partitionOwner._2 val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition) try { - createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId) + createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId.toString) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) successfullyOwnedPartitions ::= (topic, partition) true @@ -808,8 +792,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], partition: Int, topic: String, - offset: Long, consumerThreadId: String) { - val partTopicInfoMap = currentTopicRegistry.get(topic) + offset: Long, consumerThreadId: ConsumerThreadId) { + val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic) val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) val consumedOffset = new AtomicLong(offset) @@ -852,7 +836,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} - val consumerThreadIdsPerTopic: Map[String, Set[String]] = + val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] = topicCount.getConsumerThreadIdsPerTopic val allQueuesAndStreams = topicCount match { diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index 00df462..2313a57 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -72,6 +72,8 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"), // kafka.consumer.ConsumerFetcherManager new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"), diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b8698ee..555d751 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -246,7 +246,7 @@ object MirrorMaker extends Logging { info("Starting mirror maker consumer thread " + threadName) try { for (msgAndMetadata <- stream) { - val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message) mirrorDataChannel.put(data) } } catch { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dcdc1ce..a7b1fdc 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -18,7 +18,7 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} -import kafka.consumer.TopicCount +import kafka.consumer.{ConsumerThreadId, TopicCount} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} @@ -658,10 +658,10 @@ object ZkUtils extends Logging { getChildren(zkClient, dirs.consumerRegistryDir) } - def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[String]] = { + def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = { val dirs = new ZKGroupDirs(group) val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir) - val consumersPerTopicMap = new mutable.HashMap[String, List[String]] + val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]] for (consumer <- consumers) { val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient, excludeInternalTopics) for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) { diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala new file mode 100644 index 0000000..1292390 --- /dev/null +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.consumer + +import org.scalatest.junit.JUnit3Suite +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.data.Stat +import kafka.consumer.{AssignmentContext, ConsumerThreadId, RangeAssignor, RoundRobinAssignor} +import kafka.utils.{TestUtils, Logging, ZkUtils, Json} +import unit.kafka.consumer.PartitionAssignorTest.{StaticSubscriptionInfo, Scenario, WildcardSubscriptionInfo} +import junit.framework.Assert._ +import kafka.common.TopicAndPartition + +class PartitionAssignorTest extends JUnit3Suite with Logging { + + def testRoundRobinPartitionAssignor() { + /** various scenarios with only wildcard consumers */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val allConsumerIds = (1 to consumerCount).map("g1c" + _).toSet + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) + val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) + }).toSeq:_*) + + val symmetricSubscriptions = Map((1 to consumerCount).map(consumer => { + ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) + }).toSeq:_*) + val symmetricScenario = Scenario("g1", topicPartitionCounts, symmetricSubscriptions) + val symmetricZkClient = PartitionAssignorTest.setupZkClientMock(symmetricScenario) + EasyMock.replay(symmetricZkClient) + val symmetricAssignmentContext = new AssignmentContext("g1", "g1c1", excludeInternalTopics = true, symmetricZkClient) + val symmetricAssignment = new RoundRobinAssignor().assign(symmetricAssignmentContext, allConsumerIds) + PartitionAssignorTest.checkAssignmentCompleteAndUnique(symmetricScenario, symmetricAssignment) + PartitionAssignorTest.checkAssignmentIsUniform(symmetricScenario, streamCount, symmetricAssignment) + + val asymmetricSubscriptions = Map((1 to consumerCount).map(consumer => { + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) + ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) + }).toSeq:_*) + val asymmetricScenario = Scenario("g1", topicPartitionCounts, asymmetricSubscriptions) + val asymmetricZkClient = PartitionAssignorTest.setupZkClientMock(asymmetricScenario) + EasyMock.replay(asymmetricZkClient) + val asymmetricAssignmentContext = new AssignmentContext("g1", "g1c1", excludeInternalTopics = true, asymmetricZkClient) + val asymmetricAssignment = new RoundRobinAssignor().assign(asymmetricAssignmentContext, allConsumerIds) + PartitionAssignorTest.checkAssignmentCompleteAndUnique(asymmetricScenario, asymmetricAssignment) + }) + } + + def testRangePartitionAssignor() { + /** various scenarios with only wildcard consumers */ + (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1)) + val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + val streamCounts = Map((1 to topicCount).map(topic => { + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) + ("topic-" + topic, streamCount) + }).toSeq:_*) + ("g1c" + consumer, StaticSubscriptionInfo(streamCounts)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkClient = PartitionAssignorTest.setupZkClientMock(scenario) + EasyMock.replay(zkClient) + + val assignments = (1 to consumerCount).map(consumer => { + val ctx = new AssignmentContext("g1", "g1c" + consumer, excludeInternalTopics = true, zkClient) + new RangeAssignor().assign(ctx) + }) + + val globalAssignment = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + assignments.foreach(assignment => { + assignment.foreach { case(topicPartition, owner) => + val previousOwnerOpt = globalAssignment.put(topicPartition, owner) + assertTrue("Scenario %s: %s is assigned to two owners.".format(scenario, topicPartition), previousOwnerOpt.isEmpty) + } + }) + + PartitionAssignorTest.checkAssignmentCompleteAndUnique(scenario, globalAssignment) + }) + } +} + +private object PartitionAssignorTest extends Logging { + + private val TestCaseCount = 3 + private val MaxConsumerCount = 10 + private val MaxStreamCount = 8 + private val MaxTopicCount = 100 + private val MinTopicCount = 20 + private val MaxPartitionCount = 120 + private val MinPartitionCount = 8 + + private trait SubscriptionInfo { + def registrationString: String + } + + private case class StaticSubscriptionInfo(streamCounts: Map[String, Int]) extends SubscriptionInfo { + def registrationString = + Json.encode(Map("version" -> 1, + "subscription" -> streamCounts, + "pattern" -> "static", + "timestamp" -> 1234.toString)) + + override def toString = { + "Stream counts: " + streamCounts + } + } + + private case class WildcardSubscriptionInfo(streamCount: Int, regex: String, isWhitelist: Boolean) + extends SubscriptionInfo { + def registrationString = + Json.encode(Map("version" -> 1, + "subscription" -> Map(regex -> streamCount), + "pattern" -> (if (isWhitelist) "white_list" else "black_list"))) + + override def toString = { + "\"%s\":%d (%s)".format(regex, streamCount, if (isWhitelist) "whitelist" else "blacklist") + } + } + + private case class Scenario(group: String, + topicPartitionCounts: Map[String, Int], + /* consumerId -> SubscriptionInfo */ + subscriptions: Map[String, SubscriptionInfo]) { + override def toString = { + "\n" + + "Group : %s\n".format(group) + + "Topic partition counts : %s\n".format(topicPartitionCounts) + + "Consumer subscriptions : %s\n".format(subscriptions) + } + } + + private def setupZkClientMock(scenario: Scenario) = { + val consumers = java.util.Arrays.asList(scenario.subscriptions.keys.toSeq:_*) + + val zkClient = EasyMock.createStrictMock(classOf[ZkClient]) + EasyMock.checkOrder(zkClient, false) + + EasyMock.expect(zkClient.getChildren("/consumers/%s/ids".format(scenario.group))).andReturn(consumers) + EasyMock.expectLastCall().anyTimes() + + scenario.subscriptions.foreach { case(consumerId, subscriptionInfo) => + EasyMock.expect(zkClient.readData("/consumers/%s/ids/%s".format(scenario.group, consumerId), new Stat())) + .andReturn(subscriptionInfo.registrationString) + EasyMock.expectLastCall().anyTimes() + } + + scenario.topicPartitionCounts.foreach { case(topic, partitionCount) => + val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*) + EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat())) + .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment)) + EasyMock.expectLastCall().anyTimes() + } + + EasyMock.expect(zkClient.getChildren("/brokers/topics")).andReturn( + java.util.Arrays.asList(scenario.topicPartitionCounts.keys.toSeq:_*)) + EasyMock.expectLastCall().anyTimes() + + zkClient + } + + private def checkAssignmentCompleteAndUnique(scenario: Scenario, assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) { + val assignedPartitions = assignment.keySet + val givenPartitions = scenario.topicPartitionCounts.flatMap{ case (topic, partitionCount) => + (0 until partitionCount).map(partition => TopicAndPartition(topic, partition)) + }.toSet + + assertTrue("Scenario %s: the list of given partitions and assigned partitions are different.".format(scenario), + givenPartitions == assignedPartitions) + + val counts = partitionOwnerCounts(assignment) + counts.foreach { case (topicPartition, count) => + assertTrue("Scenario %s: partition %s is owned by %d (i.e., more than one) consumer streams." + .format(scenario, topicPartition, count), count == 1) + } + } + + private def checkAssignmentIsUniform(scenario: Scenario, streamCount: Int, assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) { + val expectedMinOwnedCount = + scenario.topicPartitionCounts.valuesIterator.sum / (scenario.subscriptions.size * streamCount) + val expectedMaxOwnedCount = expectedMinOwnedCount + 1 + val validCounts = Seq(expectedMinOwnedCount, expectedMaxOwnedCount) + val actualCounts = PartitionAssignorTest.partitionCountPerStream(assignment) + actualCounts.foreach { case(stream, count) => + assertTrue("Scenario %s: consumer stream %s owns %d partitions - expected range is [%d, %d]." + .format(scenario, stream, count, expectedMinOwnedCount, expectedMaxOwnedCount), validCounts.contains(count)) + } + } + + /** For each partition, count the number of consumers that own that partition (should be exactly one). */ + private def partitionOwnerCounts(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = { + val ownerCounts = collection.mutable.Map[TopicAndPartition, Int]() + assignment.foreach { case (topicPartition, owner) => + val updatedCount = ownerCounts.getOrElse(topicPartition, 0) + 1 + ownerCounts.put(topicPartition, updatedCount) + } + ownerCounts + } + + /** For each consumer stream, count the number of partitions that it owns. */ + private def partitionCountPerStream(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = { + val ownedCounts = collection.mutable.Map[ConsumerThreadId, Int]() + assignment.foreach { case (topicPartition, owner) => + val updatedCount = ownedCounts.getOrElse(owner, 0) + 1 + ownedCounts.put(owner, updatedCount) + } + ownedCounts + } +} + -- 1.8.5.2 (Apple Git-48)