From 6d10f317c43f5aec5e968d536e61f80c9e7296ef Mon Sep 17 00:00:00 2001 From: Viktor Taranenko Date: Sat, 16 Aug 2014 20:19:38 +0100 Subject: [PATCH] fix potential NullPointerException --- core/src/main/scala/kafka/utils/ZkUtils.scala | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dcdc1ce..69396f4 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -30,10 +30,10 @@ import kafka.admin._ import kafka.common.{KafkaException, NoEpochForPartitionException} import kafka.controller.ReassignedPartitionsContext import kafka.controller.KafkaController -import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition import scala.collection +import scala.util.control.Exception.failAsValue object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -79,7 +79,7 @@ object ZkUtils extends Logging { def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = { val brokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).sorted - brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).flatten } def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { @@ -452,14 +452,8 @@ object ZkUtils extends Logging { def readDataMaybeNull(client: ZkClient, path: String): (Option[String], Stat) = { val stat: Stat = new Stat() - val dataAndStat = try { - (Some(client.readData(path, stat)), stat) - } catch { - case e: ZkNoNodeException => - (None, stat) - case e2: Throwable => throw e2 - } - dataAndStat + val data = failAsValue(classOf[ZkNoNodeException])(None)(Option(client.readData(path, stat))) + (data, stat) } def getChildren(client: ZkClient, path: String): Seq[String] = { @@ -685,10 +679,8 @@ object ZkUtils extends Logging { * @return An optional Broker object encapsulating the broker metadata */ def getBrokerInfo(zkClient: ZkClient, brokerId: Int): Option[Broker] = { - ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { - case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo)) - case None => None - } + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 + .map(Broker.createBroker(brokerId, _)) } def getAllTopics(zkClient: ZkClient): Seq[String] = { @@ -703,9 +695,9 @@ object ZkUtils extends Logging { val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) if(topics == null) Set.empty[TopicAndPartition] else { - topics.map { topic => + topics.flatMap { topic => getChildren(zkClient, getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _)) - }.flatten.toSet + }.toSet } } } -- 1.8.5.2 (Apple Git-48)