diff --git core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f5df1fc..55a9d6b 100644
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -656,27 +656,25 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val consumerThreadIdsPerTopic: Map[String, Set[String]] =
       topicCount.getConsumerThreadIdsPerTopic
 
-    /*
-     * This usage of map flatten breaks up consumerThreadIdsPerTopic into
-     * a set of (topic, thread-id) pairs that we then use to construct
-     * the updated (topic, thread-id) -> queues map
-     */
-    implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _))
-
-    // iterator over (topic, thread-id) tuples
-    val topicThreadIds: Iterable[(String, String)] =
-      consumerThreadIdsPerTopic.flatten
-
     // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
     val threadQueueStreamPairs = topicCount match {
       case wildTopicCount: WildcardTopicCount =>
-        for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
-      case statTopicCount: StaticTopicCount => {
+        consumerThreadIdsPerTopic.map {
+          case (topic, threadIds) =>
+            require(threadIds.size == queuesAndStreams.size,
+              "Mismatch between wildcard thread ID count (%d) and queue count (%d)".format(
+              threadIds.size, queuesAndStreams.size))
+            threadIds.map(threadId => (topic, threadId)).zip(queuesAndStreams)
+        }.flatten
+      case statTopicCount: StaticTopicCount =>
+        val topicThreadIds = consumerThreadIdsPerTopic.map {
+          case(topic, threadIds) =>
+            threadIds.map((topic, _))
+        }.flatten
         require(topicThreadIds.size == queuesAndStreams.size,
           "Mismatch between thread ID count (%d) and queue count (%d)".format(
           topicThreadIds.size, queuesAndStreams.size))
         topicThreadIds.zip(queuesAndStreams)
-      }
     }
 
     threadQueueStreamPairs.foreach(e => {
