From 89399ffe4995ea9b4c2ebdb788f5dfd55001bc80 Mon Sep 17 00:00:00 2001 From: ulysses <646303253@qq.com> Date: Fri, 17 May 2019 16:01:50 +0800 Subject: [PATCH] Update ZkUtils.scala add try catch for getPartitionAssignmentForTopics() --- core/src/main/scala/kafka/utils/ZkUtils.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dd850906b42..911d36cfe9c 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -653,13 +653,18 @@ class ZkUtils(val zkClient: ZkClient, val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]] topics.foreach { topic => readDataMaybeNull(getTopicPath(topic))._1.foreach { jsonPartitionMap => - Json.parseFull(jsonPartitionMap).foreach { js => - js.asJsonObject.get("partitions").foreach { partitionsJs => - partitionsJs.asJsonObject.iterator.foreach { case (partition, replicas) => - ret.put(TopicAndPartition(topic, partition.toInt), replicas.to[Seq[Int]]) - debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) + try{ + Json.parseFull(jsonPartitionMap).foreach { js => + js.asJsonObject.get("partitions").foreach { partitionsJs => + partitionsJs.asJsonObject.iterator.foreach { case (partition, replicas) => + ret.put(TopicAndPartition(topic, partition.toInt), replicas.to[Seq[Int]]) + debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) + } } } + } catch { + // if topic node exists but the data not json (e.g. null), skip this topic + case _ => error("Topic [%s] data error, skip it".format(jsonPartitionMap)) } } }