diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 48172ed..9426c96 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -425,11 +425,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.clientId, config.socketTimeoutMs, correlationId.getAndIncrement).topicsMetadata + debug("TopicsMetadata length: " + topicsMetadata.length) val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] topicsMetadata.foreach(m => { - val topic = m.topic - val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) - partitionsPerTopicMap.put(topic, partitions) + debug(m.toString) + m.errorCode match { + case ErrorMapping.NoError => + val topic = m.topic + val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) + partitionsPerTopicMap.put(topic, partitions) + case _ => + throw new KafkaException("\nNo partition metadata for topic %s due to %s".format(m.topic, + ErrorMapping.exceptionFor(m.errorCode).getClass.getName)) + } }) /**