diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 1cf2f62..3a2f90a 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -49,6 +49,7 @@ object ConsumerConfig extends Config { val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" val ExcludeInternalTopics = true + val PartitionDistributionStrategy = "range" /* select between "range","roundrobin" and "symmetric" */ val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" @@ -175,6 +176,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics) + /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ + val partitionDistributionStrategy = props.getString("partition.distribution.strategy", PartitionDistributionStrategy) + validate(this) } diff --git a/core/src/main/scala/kafka/consumer/PartitionAllocator.scala b/core/src/main/scala/kafka/consumer/PartitionAllocator.scala new file mode 100644 index 0000000..c446e51 --- /dev/null +++ b/core/src/main/scala/kafka/consumer/PartitionAllocator.scala @@ -0,0 +1,205 @@ +package kafka.consumer + +import org.I0Itec.zkclient.ZkClient +import kafka.common.TopicAndPartition +import kafka.utils.{Utils, ZkUtils, Logging} +import scala.collection.mutable + +trait PartitionAllocator { + + def allocate(): scala.collection.Map[TopicAndPartition, String] + +} + +class AllocatorContext(group: String, consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) { + val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics) + val myTopicThreadIdsMap = myTopicCount.getConsumerThreadIdsPerTopic + val partitionsPerTopicMap = { + val partitionsAssignmentPerTopicMap = ZkUtils.getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq) + partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq.sorted)) + } +} + +/** + * Global round-robin allocation strategy - this should only be used when all consumers in the group have identical + * wildcard (i.e., not static) subscriptions. It lays out all the available partitions using a two-level ordering: + * first by partition number and then by topic. It also lays out the available wildcard consumer threads and proceeds + * to do a round-robin assignment from partition to consumer thread. The end result is that the partitions will be + * uniformly distributed. (i.e., the partition ownership counts will within a delta of exactly one across all consumer + * threads.) + * + * @param group Consumer group + * @param consumerId Consumer id of the consumer in the group for which we are doing the allocation + * @param excludeInternalTopics Whether or not to include internal topics + * @param zkClient zkclient + */ +class SymmetricAllocator(group: String, consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) + extends PartitionAllocator with Logging { + + def allocate() = allocate(Set(consumerId)) + + def allocate(forConsumers: Set[String]) = { + val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, String]() + val ctx = new AllocatorContext(group, consumerId, excludeInternalTopics, zkClient) + + val consumers = ZkUtils.getConsumersInGroup(zkClient, group).sorted + + // Check that all instances have identical subscriptions. + require(ctx.myTopicCount.pattern == TopicCount.whiteListPattern || + ctx.myTopicCount.pattern == TopicCount.blackListPattern, + "Symmetric allocation is allowed only with whitelists or blacklists.") + consumers.foreach(otherConsumer => { + val otherTopicCount = TopicCount.constructTopicCount(group, otherConsumer, zkClient, excludeInternalTopics) + require(otherTopicCount.pattern == ctx.myTopicCount.pattern, + "Symmetric allocation is allowed only if all consumers have identical subscription patterns.\n" + + "Consumer %s has pattern %s and consumer %s has pattern %s." + .format(consumerId, ctx.myTopicCount.pattern, otherConsumer, otherTopicCount.pattern)) + require(otherTopicCount.getTopicCountMap == ctx.myTopicCount.getTopicCountMap, + "Symmetric allocation is allowed only if all consumers have identical topic subscriptions.\n" + + "Consumer %s has topic-count %s and consumer %s has topic-count %s." + .format(consumerId, ctx.myTopicCount.getTopicCountMap, otherConsumer, otherTopicCount.getTopicCountMap)) + }) + + // If subscriptions are identical then the threadIds will also be identical for each consumer. + val streamCount = ctx.myTopicThreadIdsMap.head._2.size + + /* + * Generate an ordered (by consumerid-threadid) sequence of (consumerid-threadId, isLocal) pairs to do the + * assignment. We need to pair it with the consumer ID since we only need to return allocations for the consumers + * requested in allocate(forConsumers). + */ + val allThreadIds = consumers.flatMap(consumer => + (0 until streamCount).map(id => (TopicCount.makeThreadId(consumer, id), consumer))) // ordered by consumerid-threadid + + val threadAllocator = Utils.circularIterator(allThreadIds) + + val allTopicPartitions = ctx.partitionsPerTopicMap.flatMap { case(topic, partitions) => + info("Consumer %s using symmetric allocation to rebalance the following partitions: %s for topic %s with consumers: %s" + .format(consumerId, partitions, topic, consumers)) + partitions.map(partition => { + TopicAndPartition(topic, partition) + }) + }.toSeq.sortWith((topicPartition1, topicPartition2) => { + /** + * Sample ordering: topic0-0, topic1-0, ..., topic0-1, topic1-1, ... + * This helps reduce the likelihood of all partitions of a given topic ending up on one consumer (if it has a high + * enough stream count). + */ + if (topicPartition1.partition == topicPartition2.partition) + topicPartition1.topic < topicPartition2.topic + else + topicPartition1.partition < topicPartition2.partition + }) + + allTopicPartitions.foreach(topicPartition => { + val threadId = threadAllocator.next() + if (forConsumers.contains(threadId._2)) + partitionOwnershipDecision += (topicPartition -> threadId._1) + }) + + partitionOwnershipDecision + } +} + +/** + * Topic-level round-robin allocation strategy - while the global round-robin allocator works only with wildcards and + * identical subscriptions, this strategy accomodates asymmetric subscriptions (i.e., different stream counts) and a + * mix of wildcard and static subscriptions. It executes on a per-topic basis. For each topic, it lays out the available + * partitions and the available consumer threads and proceeds to do a round-robin assignment at two levels: first it + * selects the next available consumer instance and then it selects the next available consumer thread within that + * instance. + * + * @param group Consumer group + * @param consumerId Consumer id of the consumer in the group for which we are doing the allocation + * @param excludeInternalTopics Whether or not to include internal topics + * @param zkClient zkclient + */ +class RoundRobinAllocator(group: String, consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) + extends PartitionAllocator with Logging { + + def allocate() = allocate(Set(consumerId)) + + def allocate(forConsumers: Set[String]) = { + val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, String]() + val ctx = new AllocatorContext(group, consumerId, excludeInternalTopics, zkClient) + + val consumers = ZkUtils.getConsumersInGroup(zkClient, group).sorted + + val consumersForTopic = mutable.Map[String, mutable.ListBuffer[String]]() + + val threadAllocatorsForConsumer = consumers.map(consumer => { + (consumer, { + val threadIdsByTopic = ZkUtils.getConsumerThreadIds(zkClient, group, consumer, excludeInternalTopics) + threadIdsByTopic.map { case (topic, threadIds) => + if (ctx.myTopicThreadIdsMap.contains(topic)) { + val consumersSoFar = consumersForTopic.getOrElse(topic, mutable.ListBuffer[String]()) + consumersForTopic.put(topic, consumersSoFar += consumer) + } + (topic, Utils.circularIterator(threadIds.toSeq.sorted)) + } + }) + }).toMap + + val consumerAllocatorsForTopic = consumersForTopic.map(e => (e._1, Utils.circularIterator(e._2))).toMap + + // now do the assignment + ctx.myTopicThreadIdsMap.keys.foreach(topic => { + val curPartitions = ctx.partitionsPerTopicMap.get(topic).get + info("Consumer %s using round-robin allocation to rebalance the following partitions: %s for topic %s with consumers: %s" + .format(consumerId, curPartitions, topic, consumers)) + curPartitions.foreach(partition => { + val assignedConsumer = consumerAllocatorsForTopic(topic).next() + val assignedThread = threadAllocatorsForConsumer(assignedConsumer)(topic).next() + if (forConsumers.contains(assignedConsumer)) + partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> assignedThread) + }) + }) + + partitionOwnershipDecision + } +} + +class RangeAllocator(group: String, consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) + extends PartitionAllocator with Logging { + + def allocate() = { + val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, String]() + val ctx = new AllocatorContext(group, consumerId, excludeInternalTopics, zkClient) + val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics) + + for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIdsMap) { + val curConsumers = consumersPerTopicMap.get(topic).get + val curPartitions: Seq[Int] = ctx.partitionsPerTopicMap.get(topic).get + + val nPartsPerConsumer = curPartitions.size / curConsumers.size + val nConsumersWithExtraPart = curPartitions.size % curConsumers.size + + info("Consumer " + consumerId + " rebalancing the following partitions: " + curPartitions + + " for topic " + topic + " with consumers: " + curConsumers) + + for (consumerThreadId <- consumerThreadIdSet) { + val myConsumerPosition = curConsumers.indexOf(consumerThreadId) + assert(myConsumerPosition >= 0) + val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) + val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) + + /** + * Range-partition the sorted partitions to consumers for better locality. + * The first few consumers pick up an extra partition, if any. + */ + if (nParts <= 0) + warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) + else { + for (i <- startPart until startPart + nParts) { + val partition = curPartitions(i) + info(consumerThreadId + " attempting to claim partition " + partition) + // record the partition ownership decision + partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) + } + } + } + } + + partitionOwnershipDecision + } +} diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index c793110..2292b79 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -27,25 +27,28 @@ private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] def getTopicCountMap: Map[String, Int] def pattern: String - - protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, - topicCountMap: Map[String, Int]) = { + +} + +private[kafka] object TopicCount extends Logging { + val whiteListPattern = "white_list" + val blackListPattern = "black_list" + val staticPattern = "static" + + def makeThreadId(consumerIdString: String, threadId: Int) = consumerIdString + "-" + threadId + + def makeConsumerThreadIdsPerTopic(consumerIdString: String, + topicCountMap: Map[String, Int]) = { val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]() for ((topic, nConsumers) <- topicCountMap) { val consumerSet = new mutable.HashSet[String] assert(nConsumers >= 1) for (i <- 0 until nConsumers) - consumerSet += consumerIdString + "-" + i + consumerSet += makeThreadId(consumerIdString, i) consumerThreadIdsPerTopicMap.put(topic, consumerSet) } consumerThreadIdsPerTopicMap } -} - -private[kafka] object TopicCount extends Logging { - val whiteListPattern = "white_list" - val blackListPattern = "black_list" - val staticPattern = "static" def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = { val dirs = new ZKGroupDirs(group) @@ -101,7 +104,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) extends TopicCount { - def getConsumerThreadIdsPerTopic = makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap) + def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap) override def equals(obj: Any): Boolean = { obj match { @@ -124,7 +127,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, def getConsumerThreadIdsPerTopic = { val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath) .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics)) - makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) + TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } def getTopicCountMap = Map(topicFilter.regex -> numStreams) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 65f518d..2449e5a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -513,6 +513,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { + private val partitionAllocator = config.partitionDistributionStrategy match { + case "roundrobin" => new RoundRobinAllocator(group, consumerIdString, config.excludeInternalTopics, zkClient) + case "symmetric" => new SymmetricAllocator(group, consumerIdString, config.excludeInternalTopics, zkClient) + case _ => new RangeAllocator(group, consumerIdString, config.excludeInternalTopics, zkClient) + } private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() @@ -617,7 +622,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def rebalance(cluster: Cluster): Boolean = { val myTopicThreadIdsMap = TopicCount.constructTopicCount( group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic - val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics) val brokers = getAllBrokersInCluster(zkClient) if (brokers.size == 0) { // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. @@ -628,9 +632,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, true } else { - val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq) - val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq.sorted)) - /** * fetchers must be stopped to avoid data duplication, since if the current * rebalancing attempt fails, the partitions that are released could be owned by another consumer. @@ -641,67 +642,31 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, releasePartitionOwnership(topicRegistry) - var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]() - val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] - - for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { - currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) - - val curConsumers = consumersPerTopicMap.get(topic).get - val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get - - val nPartsPerConsumer = curPartitions.size / curConsumers.size - val nConsumersWithExtraPart = curPartitions.size % curConsumers.size - - info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + - " for topic " + topic + " with consumers: " + curConsumers) - - for (consumerThreadId <- consumerThreadIdSet) { - val myConsumerPosition = curConsumers.indexOf(consumerThreadId) - assert(myConsumerPosition >= 0) - val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) - val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) - - /** - * Range-partition the sorted partitions to consumers for better locality. - * The first few consumers pick up an extra partition, if any. - */ - if (nParts <= 0) - warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) - else { - for (i <- startPart until startPart + nParts) { - val partition = curPartitions(i) - info(consumerThreadId + " attempting to claim partition " + partition) - // record the partition ownership decision - partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) - } - } - } - } + val partitionOwnershipDecision = partitionAllocator.allocate() + val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( + valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) // fetch current offsets for all topic-partitions val topicPartitions = partitionOwnershipDecision.keySet.toSeq + val offsetFetchResponseOpt = fetchOffsets(topicPartitions) if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) false else { val offsetFetchResponse = offsetFetchResponseOpt.get - topicPartitions.foreach { topicAndPartition => + topicPartitions.foreach(topicAndPartition => { val (topic, partition) = topicAndPartition.asTuple val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset val threadId = partitionOwnershipDecision(topicAndPartition) addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) - } + }) /** * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt * A rebalancing attempt is completed successfully only after the fetchers have been started correctly */ if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) { - info("Updating the cache") - debug("Partitions per topic cache " + partitionsPerTopicMap) - debug("Consumers per topic cache " + consumersPerTopicMap) topicRegistry = currentTopicRegistry updateFetcher(cluster) true @@ -808,7 +773,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], partition: Int, topic: String, offset: Long, consumerThreadId: String) { - val partTopicInfoMap = currentTopicRegistry.get(topic) + val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic) val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) val consumedOffset = new AtomicLong(offset) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dcdc1ce..b82bb4e 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -658,6 +658,11 @@ object ZkUtils extends Logging { getChildren(zkClient, dirs.consumerRegistryDir) } + def getConsumerThreadIds(zkClient:ZkClient, group: String, consumer: String, excludeInternalTopics: Boolean) = { + val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient, excludeInternalTopics) + topicCount.getConsumerThreadIdsPerTopic + } + def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[String]] = { val dirs = new ZKGroupDirs(group) val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir) diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala new file mode 100644 index 0000000..b0369c8 --- /dev/null +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala @@ -0,0 +1,293 @@ +package unit.kafka.consumer + +import org.scalatest.junit.JUnit3Suite +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.data.Stat +import kafka.consumer.{SymmetricAllocator, RoundRobinAllocator} +import kafka.utils.{TestUtils, Logging, ZkUtils, Json} +import unit.kafka.consumer.PartitionAllocatorTest.{StaticSubscriptionInfo, Scenario, WildcardSubscriptionInfo} +import junit.framework.Assert._ +import kafka.common.TopicAndPartition + +class PartitionAllocatorTest extends JUnit3Suite with Logging { + + def testSymmetricAllocator() { + (1 to PartitionAllocatorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 2.max(TestUtils.random.nextInt(PartitionAllocatorTest.MaxConsumerCount + 1)) + val allConsumerIds = (1 to consumerCount).map("g1c" + _).toSet + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAllocatorTest.MaxStreamCount + 1)) + val topicCount = PartitionAllocatorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAllocatorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAllocatorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAllocatorTest.MaxPartitionCount + 1))) + }).toSeq:_*) + + val subscriptions = Map((1 to consumerCount).map(consumer => { + ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) + }).toSeq:_*) + val scenario = Scenario("g1", topicPartitionCounts, subscriptions) + val zkClient = PartitionAllocatorTest.setupZkClientMock(scenario) + EasyMock.replay(zkClient) + + // to check that another (random) consumer's computed global allocation is identical to c1's computed global allocation + var cx = 1 + while (cx == 1) cx = 1.max(TestUtils.random.nextInt(consumerCount + 1)) + + val c1Allocation = new SymmetricAllocator("g1", "g1c1", excludeInternalTopics = true, zkClient) + .allocate(allConsumerIds) + PartitionAllocatorTest.checkAllocationsAreCompleteAndUnique(scenario, c1Allocation) + PartitionAllocatorTest.checkAllocationsAreUniform(scenario, streamCount, c1Allocation) + + val cxAllocation = new SymmetricAllocator("g1", "g1c" + cx, excludeInternalTopics = true, zkClient) + .allocate(allConsumerIds) + assertTrue("Scenario %s: inconsistent allocations between consumer 1 and %d.".format(scenario, cx), + cxAllocation == c1Allocation) + }) + } + + def testRoundRobinPartitionAllocator() { + /** + * Fully check (by hand) two asymmetric scenarios - one with only wildcard subscriptions and one with a mix of + * wildcard and static subscriptions. + * + * Automatically test a range of symmetric and asymmetric scenarios - check for coverage and uniqueness. + */ + + /** only wildcard scenario: c1 with two streams, c2 with three streams */ + val wildcardScenario = Scenario(group = "g1", + topicPartitionCounts = Map("x" -> 8, "y" -> 4, "z" -> 6), + subscriptions = Map("g1c1" -> WildcardSubscriptionInfo(streamCount = 2, + regex = ".*", + isWhitelist = true), + "g1c2" -> WildcardSubscriptionInfo(streamCount = 3, + regex = ".*", + isWhitelist = true))) + val allConsumerIds: Set[String] = Set("g1c1", "g1c2") + val wildcardScenarioZkClient = PartitionAllocatorTest.setupZkClientMock(wildcardScenario) + EasyMock.replay(wildcardScenarioZkClient) + + val wildcardScenarioAllocator = new RoundRobinAllocator("g1", "g1c1", excludeInternalTopics = true, wildcardScenarioZkClient) + val wildcardScenarioAllocation = wildcardScenarioAllocator.allocate(allConsumerIds) + + PartitionAllocatorTest.checkAllocationsAreCompleteAndUnique(wildcardScenario, wildcardScenarioAllocation) + + val expectedWildcardScenarioAllocation = Map( + TopicAndPartition("x", 0) -> "g1c1-0", TopicAndPartition("x", 1) -> "g1c2-0", TopicAndPartition("x", 2) -> "g1c1-1", + TopicAndPartition("x", 3) -> "g1c2-1", TopicAndPartition("x", 4) -> "g1c1-0", TopicAndPartition("x", 5) -> "g1c2-2", + TopicAndPartition("x", 6) -> "g1c1-1", TopicAndPartition("x", 7) -> "g1c2-0", + TopicAndPartition("y", 0) -> "g1c1-0", TopicAndPartition("y", 1) -> "g1c2-0", TopicAndPartition("y", 2) -> "g1c1-1", + TopicAndPartition("y", 3) -> "g1c2-1", + TopicAndPartition("z", 0) -> "g1c1-0", TopicAndPartition("z", 1) -> "g1c2-0", TopicAndPartition("z", 2) -> "g1c1-1", + TopicAndPartition("z", 3) -> "g1c2-1", TopicAndPartition("z", 4) -> "g1c1-0", TopicAndPartition("z", 5) -> "g1c2-2" + ) + assertTrue("Scenario %s: incorrect allocation\n".format(wildcardScenario), + expectedWildcardScenarioAllocation == wildcardScenarioAllocation) + + /** mixed scenario - i.e., both static and wildcard consumers */ + val mixedScenario = Scenario(group = "g1", + topicPartitionCounts = Map("x" -> 8, "y" -> 4, "z" -> 6), + subscriptions = Map("g1c1" -> WildcardSubscriptionInfo(streamCount = 2, + regex = ".*", + isWhitelist = true), + "g1c2" -> StaticSubscriptionInfo(Map("x" -> 3, "y" -> 1)))) + val mixedScenarioZkClient = PartitionAllocatorTest.setupZkClientMock(mixedScenario) + EasyMock.replay(mixedScenarioZkClient) + + val mixedScenarioAllocator = new RoundRobinAllocator("g1", "g1c1", excludeInternalTopics = true, mixedScenarioZkClient) + val mixedScenarioAllocation = mixedScenarioAllocator.allocate(allConsumerIds) + + PartitionAllocatorTest.checkAllocationsAreCompleteAndUnique(mixedScenario, mixedScenarioAllocation) + + val expectedMixedScenarioAllocation = Map( + TopicAndPartition("x", 0) -> "g1c1-0", TopicAndPartition("x", 1) -> "g1c2-0", TopicAndPartition("x", 2) -> "g1c1-1", + TopicAndPartition("x", 3) -> "g1c2-1", TopicAndPartition("x", 4) -> "g1c1-0", TopicAndPartition("x", 5) -> "g1c2-2", + TopicAndPartition("x", 6) -> "g1c1-1", TopicAndPartition("x", 7) -> "g1c2-0", + TopicAndPartition("y", 0) -> "g1c1-0", TopicAndPartition("y", 1) -> "g1c2-0", TopicAndPartition("y", 2) -> "g1c1-1", + TopicAndPartition("y", 3) -> "g1c2-0", + TopicAndPartition("z", 0) -> "g1c1-0", TopicAndPartition("z", 1) -> "g1c1-1", TopicAndPartition("z", 2) -> "g1c1-0", + TopicAndPartition("z", 3) -> "g1c1-1", TopicAndPartition("z", 4) -> "g1c1-0", TopicAndPartition("z", 5) -> "g1c1-1" + ) + assertTrue("Scenario %s: incorrect allocation\n".format(mixedScenario), + expectedMixedScenarioAllocation == mixedScenarioAllocation) + + /** various scenarios with only wildcard consumers */ + (1 to PartitionAllocatorTest.TestCaseCount).foreach (testCase => { + val consumerCount = 2.max(TestUtils.random.nextInt(PartitionAllocatorTest.MaxConsumerCount + 1)) + val allConsumerIds = (1 to consumerCount).map("g1c" + _).toSet + val streamCount = 1.max(TestUtils.random.nextInt(PartitionAllocatorTest.MaxStreamCount + 1)) + val topicCount = PartitionAllocatorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAllocatorTest.MaxTopicCount + 1)) + + val topicPartitionCounts = Map((1 to topicCount).map(topic => { + ("topic-" + topic, PartitionAllocatorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAllocatorTest.MaxPartitionCount))) + }).toSeq:_*) + + // to check that another random consumer's computed global allocation is identical to C1's computed global allocation + var cx = 1 + while (cx == 1) cx = 1.max(TestUtils.random.nextInt(consumerCount + 1)) + + val symmetricSubscriptions = Map((1 to consumerCount).map(consumer => { + ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) + }).toSeq:_*) + val symmetricScenario = Scenario("g1", topicPartitionCounts, symmetricSubscriptions) + val symmetricZkClient = PartitionAllocatorTest.setupZkClientMock(symmetricScenario) + EasyMock.replay(symmetricZkClient) + val c1SymmetricAllocation = new RoundRobinAllocator("g1", "g1c1", excludeInternalTopics = true, symmetricZkClient) + .allocate(allConsumerIds) + PartitionAllocatorTest.checkAllocationsAreCompleteAndUnique(symmetricScenario, c1SymmetricAllocation) + + val cxSymmetricAllocation = new RoundRobinAllocator("g1", "g1c" + cx, excludeInternalTopics = true, symmetricZkClient) + .allocate(allConsumerIds) + assertTrue("Scenario %s: inconsistent allocations between consumer 1 and %d.".format(symmetricScenario, cx), + cxSymmetricAllocation == c1SymmetricAllocation) + + val asymmetricSubscriptions = Map((1 to consumerCount).map(consumer => { + val streamCount = 1.max(TestUtils.random.nextInt(1)) + ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) + }).toSeq:_*) + val asymmetricScenario = Scenario("g1", topicPartitionCounts, asymmetricSubscriptions) + val asymmetricZkClient = PartitionAllocatorTest.setupZkClientMock(asymmetricScenario) + EasyMock.replay(asymmetricZkClient) + val asymmetricAllocation = new RoundRobinAllocator("g1", "g1c1", excludeInternalTopics = true, asymmetricZkClient) + .allocate(allConsumerIds) + PartitionAllocatorTest.checkAllocationsAreCompleteAndUnique(asymmetricScenario, asymmetricAllocation) + + val cxAsymmetricAllocation = new RoundRobinAllocator("g1", "g1c" + cx, excludeInternalTopics = true, asymmetricZkClient) + .allocate(allConsumerIds) + assertTrue("Scenario %s: inconsistent allocations between consumer 1 and %d.".format(asymmetricScenario, cx), + cxAsymmetricAllocation == asymmetricAllocation) + + }) + } + +} + +private object PartitionAllocatorTest extends Logging { + + private val TestCaseCount = 5 + private val MaxConsumerCount = 20 + private val MaxStreamCount = 10 + private val MaxTopicCount = 200 + private val MinTopicCount = 20 + private val MaxPartitionCount = 120 + private val MinPartitionCount = 8 + + private trait SubscriptionInfo { + def registrationString: String + } + + private case class StaticSubscriptionInfo(streamCounts: Map[String, Int]) extends SubscriptionInfo { + def registrationString = + Json.encode(Map("version" -> 1, + "subscription" -> streamCounts, + "pattern" -> "static", + "timestamp" -> 1234.toString)) + + override def toString = { + "Stream counts: " + streamCounts + } + } + + private case class WildcardSubscriptionInfo(streamCount: Int, regex: String, isWhitelist: Boolean) + extends SubscriptionInfo { + def registrationString = + Json.encode(Map("version" -> 1, + "subscription" -> Map(regex -> streamCount), + "pattern" -> (if (isWhitelist) "white_list" else "black_list"))) + + override def toString = { + "\"%s\":%d (%s)".format(regex, streamCount, if (isWhitelist) "whitelist" else "blacklist") + } + } + + private case class Scenario(group: String, + topicPartitionCounts: Map[String, Int], + /* consumerId -> SubscriptionInfo */ + subscriptions: Map[String, SubscriptionInfo]) { + override def toString = { + "\n" + + "Group : %s\n".format(group) + + "Topic partition counts : %s\n".format(topicPartitionCounts) + + "Consumer subscriptions : %s\n".format(subscriptions) + } + } + + private def setupZkClientMock(scenario: Scenario) = { + val consumers = java.util.Arrays.asList(scenario.subscriptions.keys.toSeq:_*) + + val zkClient = EasyMock.createStrictMock(classOf[ZkClient]) + EasyMock.checkOrder(zkClient, false) + + EasyMock.expect(zkClient.getChildren("/consumers/%s/ids".format(scenario.group))).andReturn(consumers) + EasyMock.expectLastCall().anyTimes() + + scenario.subscriptions.foreach { case(consumerId, subscriptionInfo) => + EasyMock.expect(zkClient.readData("/consumers/%s/ids/%s".format(scenario.group, consumerId), new Stat())) + .andReturn(subscriptionInfo.registrationString) + EasyMock.expectLastCall().anyTimes() + } + + scenario.topicPartitionCounts.foreach { case(topic, partitionCount) => + val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*) + EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat())) + .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment)) + EasyMock.expectLastCall().anyTimes() + } + + EasyMock.expect(zkClient.getChildren("/brokers/topics")).andReturn( + java.util.Arrays.asList(scenario.topicPartitionCounts.keys.toSeq:_*)) + EasyMock.expectLastCall().anyTimes() + + zkClient + } + + private def checkAllocationsAreCompleteAndUnique(scenario: Scenario, allocation: collection.Map[TopicAndPartition, String]) { + val allocatedPartitions = allocation.keySet + val givenPartitions = scenario.topicPartitionCounts.flatMap{ case (topic, partitionCount) => + (0 until partitionCount).map(partition => TopicAndPartition(topic, partition)) + }.toSet + + assertTrue("Scenario %s: the list of given partitions and assigned partitions are different.".format(scenario), + givenPartitions == allocatedPartitions) + + val counts = partitionOwnerCounts(allocation) + counts.foreach { case (topicPartition, count) => + assertTrue("Scenario %s: partition %s is owned by %d (i.e., more than one) consumer streams." + .format(scenario, topicPartition, count), count <= 1) + } + } + + private def checkAllocationsAreUniform(scenario: Scenario, streamCount: Int, allocation: collection.Map[TopicAndPartition, String]) { + val expectedMinOwnedCount = + scenario.topicPartitionCounts.valuesIterator.sum / (scenario.subscriptions.size * streamCount) + val expectedMaxOwnedCount = expectedMinOwnedCount + 1 + val validCounts = Seq(expectedMinOwnedCount, expectedMaxOwnedCount) + val actualCounts = PartitionAllocatorTest.partitionsOwnedCounts(allocation) + actualCounts.foreach { case(stream, count) => + assertTrue("Scenario %s: consumer stream %s owns %d partitions - expected range is [%d, %d]." + .format(scenario, stream, count, expectedMinOwnedCount, expectedMaxOwnedCount), validCounts.contains(count)) + } + } + + /** For each partition, count the number of consumers that own that partition (should be exactly one). */ + private def partitionOwnerCounts(allocation: collection.Map[TopicAndPartition, String]) = { + val ownerCounts = collection.mutable.Map[TopicAndPartition, Int]() + allocation.foreach { case (topicPartition, owner) => + val updatedCount = ownerCounts.getOrElse(topicPartition, 0) + 1 + ownerCounts.put(topicPartition, updatedCount) + } + ownerCounts + } + + /** For each consumer stream, count the number of partitions that it owns. */ + private def partitionsOwnedCounts(allocation: collection.Map[TopicAndPartition, String]) = { + val ownedCounts = collection.mutable.Map[String, Int]() + allocation.foreach { case (topicPartition, owner) => + val updatedCount = ownedCounts.getOrElse(owner, 0) + 1 + ownedCounts.put(owner, updatedCount) + } + ownedCounts + } + +} +