diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 93a634e..45167c5 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -87,55 +87,49 @@ object AdminUtils extends Logging { case e2 => throw new AdministrationException(e2.toString) } } - - def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = + + def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker]) def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = { val cachedBrokerInfo = new mutable.HashMap[Int, Broker]() topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo)) } - + private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = { if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) - val partitionMetadata = sortedPartitions.map { partitionMap => - val partition = partitionMap._1 - val replicas = partitionMap._2 - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) - debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) + val partition = partitionMap._1 + val replicas = partitionMap._2 + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) + debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) - var leaderInfo: Option[Broker] = None - var replicaInfo: Seq[Broker] = Nil - var isrInfo: Seq[Broker] = Nil - try { + var leaderInfo: Option[Broker] = None + var replicaInfo: Seq[Broker] = Nil + var isrInfo: Seq[Broker] = Nil try { - leaderInfo = leader match { - case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) - case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) + try { + leaderInfo = leader match { + case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) + case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) + } + } catch { + case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition)) + } + try { + replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) + isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) + } catch { + case e => throw new ReplicaNotAvailableException(e) } - } catch { - case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition)) - } - - try { - replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) - isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) - } catch { - case e => throw new ReplicaNotAvailableException(e) - } - new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { - case e: ReplicaNotAvailableException => + case e => new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - case le: LeaderNotAvailableException => - new PartitionMetadata(partition, None, replicaInfo, isrInfo, - ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]])) + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } } new TopicMetadata(topic, partitionMetadata) diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index a0e2b44..f2ab0a6 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -80,10 +80,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, if(tmd.errorCode == ErrorMapping.NoError){ topicPartitionInfo.put(tmd.topic, tmd) } else - warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) + warn("Error while fetching metadata for topic [%s]: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError){ - debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode)) + warn("Error while fetching metadata for topic partition [%s,%d]: [%s]".format(tmd.topic, pmd.partitionId, pmd), + ErrorMapping.exceptionFor(pmd.errorCode)) } }) })