diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index f7782df..906b79e 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -681,28 +681,26 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val consumerThreadIdsPerTopic: Map[String, Set[String]] = topicCount.getConsumerThreadIdsPerTopic - /* - * This usage of map flatten breaks up consumerThreadIdsPerTopic into - * a set of (topic, thread-id) pairs that we then use to construct - * the updated (topic, thread-id) -> queues map - */ - implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _)) - - // iterator over (topic, thread-id) tuples - val topicThreadIds: Iterable[(String, String)] = - consumerThreadIdsPerTopic.flatten - - // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream)) - val threadQueueStreamPairs = topicCount match { + val allQueuesAndStreams = topicCount match { case wildTopicCount: WildcardTopicCount => - for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs) - case statTopicCount: StaticTopicCount => { - require(topicThreadIds.size == queuesAndStreams.size, - "Mismatch between thread ID count (%d) and queue count (%d)".format( - topicThreadIds.size, queuesAndStreams.size)) - topicThreadIds.zip(queuesAndStreams) - } - } + /* + * Wild-card consumption streams share the same queues, so we need to + * duplicate the list for the subsequent zip operation. + */ + (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList + case statTopicCount: StaticTopicCount => + queuesAndStreams + } + + val topicThreadIds = consumerThreadIdsPerTopic.map { + case(topic, threadIds) => + threadIds.map((topic, _)) + }.flatten + + require(topicThreadIds.size == allQueuesAndStreams.size, + "Mismatch between thread ID count (%d) and queue count (%d)" + .format(topicThreadIds.size, allQueuesAndStreams.size)) + val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams) threadQueueStreamPairs.foreach(e => { val topicThreadId = e._1 diff --git a/system_test/mirror_maker/bin/run-test.sh b/system_test/mirror_maker/bin/run-test.sh old mode 100644 new mode 100755