Kafka
  1. Kafka
  2. KAFKA-799

Infinite loop trying to start a broker

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Blocker Blocker
    • Resolution: Not a Problem
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: controller
    • Labels:
    • Environment:
      Mac OS X 10.7.5

      Description

      I followed the quickstart instructions
      https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.8+Quick+Start

      It caused an infinite loop while trying to start a broker.
      [2013-03-08 16:55:19,287] ERROR Error while electing or becoming leader on broker 1 (kafka.server.ZookeeperLeaderElector)
      kafka.common.KafkaException: Can't parse json string: null
      at kafka.utils.Json$.liftedTree1$1(Json.scala:20)
      at kafka.utils.Json$.parseFull(Json.scala:16)
      at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:484)
      at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:480)
      at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      at scala.collection.immutable.List.foreach(List.scala:45)
      at kafka.utils.ZkUtils$.getReplicaAssignmentForTopics(ZkUtils.scala:480)
      at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:451)
      at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:225)
      at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:87)
      at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
      at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106)
      at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
      at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
      Caused by: java.lang.NullPointerException
      at scala.util.parsing.combinator.lexical.Scanners$Scanner.<init>(Scanners.scala:52)
      at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:71)
      at scala.util.parsing.json.JSON$.parseFull(JSON.scala:85)
      at kafka.utils.Json$.liftedTree1$1(Json.scala:17)
      ... 13 more

      I tracked the issue to unhandled Java null string in these 2 methods in ZkUtils: getReplicaAssignmentForTopics, getPartitionAssignmentForTopics.

      I am submitting a patch with the fixes. Now quickstart works fine for me.

        Activity

        Hide
        Bob added a comment -

        def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
        val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
        topics.foreach { topic =>
        val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
        jsonPartitionMapOpt match {
        case Some(jsonPartitionMap) =>
        if (jsonPartitionMap != null) {
        Json.parseFull(jsonPartitionMap) match {
        case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
        case Some(repl) =>
        val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
        for ((partition, replicas) <- replicaMap)

        { ret.put(TopicAndPartition(topic, partition.toInt), replicas) debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) }

        case None =>
        }
        case None =>
        }
        }
        case None =>
        }
        }
        ret
        }

        def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
        val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
        topics.foreach { topic =>
        val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
        val partitionMap = jsonPartitionMapOpt match {
        case Some(jsonPartitionMap) =>
        if (jsonPartitionMap != null) {
        Json.parseFull(jsonPartitionMap) match {
        case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match

        { case Some(replicaMap) => val m1 = replicaMap.asInstanceOf[Map[String, Seq[Int]]] m1.map(p => (p._1.toInt, p._2)) case None => Map[Int, Seq[Int]]() }

        case None => Map[Int, Seq[Int]]()
        }
        } else Map[Int, Seq[Int]]()
        case None => Map[Int, Seq[Int]]()
        }
        debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
        ret += (topic -> partitionMap)
        }
        ret
        }

        Show
        Bob added a comment - def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq [String] ): mutable.Map[TopicAndPartition, Seq [Int] ] = { val ret = new mutable.HashMap[TopicAndPartition, Seq [Int] ] topics.foreach { topic => val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => if (jsonPartitionMap != null) { Json.parseFull(jsonPartitionMap) match { case Some(m) => m.asInstanceOf[Map [String, Any] ].get("partitions") match { case Some(repl) => val replicaMap = repl.asInstanceOf[Map[String, Seq [Int] ]] for ((partition, replicas) <- replicaMap) { ret.put(TopicAndPartition(topic, partition.toInt), replicas) debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) } case None => } case None => } } case None => } } ret } def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq [String] ): mutable.Map[String, collection.Map[Int, Seq [Int] ]] = { val ret = new mutable.HashMap[String, Map[Int, Seq [Int] ]]() topics.foreach { topic => val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 val partitionMap = jsonPartitionMapOpt match { case Some(jsonPartitionMap) => if (jsonPartitionMap != null) { Json.parseFull(jsonPartitionMap) match { case Some(m) => m.asInstanceOf[Map [String, Any] ].get("partitions") match { case Some(replicaMap) => val m1 = replicaMap.asInstanceOf[Map[String, Seq[Int]]] m1.map(p => (p._1.toInt, p._2)) case None => Map[Int, Seq[Int]]() } case None => Map[Int, Seq [Int] ]() } } else Map[Int, Seq [Int] ]() case None => Map[Int, Seq [Int] ]() } debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap)) ret += (topic -> partitionMap) } ret }
        Hide
        Swapnil Ghike added a comment -

        Hmm, I tried booting up zookeeper and a kafka broker but could not reproduce this issue. Do you know any other way of reproducing this issue? Seems like the /brokers/topics/some-topic path in zk has a null stored for its data.

        As of now, I am not sure if guarding for null is the right solution or it will merely hide another bug.

        Show
        Swapnil Ghike added a comment - Hmm, I tried booting up zookeeper and a kafka broker but could not reproduce this issue. Do you know any other way of reproducing this issue? Seems like the /brokers/topics/some-topic path in zk has a null stored for its data. As of now, I am not sure if guarding for null is the right solution or it will merely hide another bug.
        Hide
        Bob added a comment - - edited

        Hmm indeed, I tried this on another Mac and didn't see the same issue. On the laptop with the issue, I installed 0.7.2 first and got that working and then tried the 0.8 which failed. Could that have caused the issue? Does zookeeper keep data in the same folder regardless of kafka version? It's in /tmp/zookeeper folder, right? Should there never be a null value in this case? In Scala, Some(null) is not equal to None so it probably doesn't hurt to have the safeguard in there IMO but you are right, we don't want to just mask a bug.

        Show
        Bob added a comment - - edited Hmm indeed, I tried this on another Mac and didn't see the same issue. On the laptop with the issue, I installed 0.7.2 first and got that working and then tried the 0.8 which failed. Could that have caused the issue? Does zookeeper keep data in the same folder regardless of kafka version? It's in /tmp/zookeeper folder, right? Should there never be a null value in this case? In Scala, Some(null) is not equal to None so it probably doesn't hurt to have the safeguard in there IMO but you are right, we don't want to just mask a bug.
        Hide
        Swapnil Ghike added a comment -

        Yes, booting up a kafka 0.8 server after having zk data created by kafka 0.7 indeed produces this message. The zk data structures used in 0.7 and 0.8 are different, and as you mentioned, both versions store the data in /tmp/zookeeper. So you need to clean up this directory before switching the versions. Though it's not a big issue, it maybe worthwhile to put the zookeeper data in separate directories for kafka 0.7 and 0.8 to avoid on-boarding obstacles. For now, you can do this locally by changing the path in config/zookeeper.properties.

        Kafka parses the zookeeper data for many purposes and any zk data that kafka uses is written using some kafka logic / admin tool. Since zookeeper is kafka's source of truth, we probably want to see an exception if the zookeeper data gets corrupted due to some external reason. For example, in this case, avoiding the exception will hide the fact that kafka can no longer find partitions/replicas of an existing topic.

        Show
        Swapnil Ghike added a comment - Yes, booting up a kafka 0.8 server after having zk data created by kafka 0.7 indeed produces this message. The zk data structures used in 0.7 and 0.8 are different, and as you mentioned, both versions store the data in /tmp/zookeeper. So you need to clean up this directory before switching the versions. Though it's not a big issue, it maybe worthwhile to put the zookeeper data in separate directories for kafka 0.7 and 0.8 to avoid on-boarding obstacles. For now, you can do this locally by changing the path in config/zookeeper.properties. Kafka parses the zookeeper data for many purposes and any zk data that kafka uses is written using some kafka logic / admin tool. Since zookeeper is kafka's source of truth, we probably want to see an exception if the zookeeper data gets corrupted due to some external reason. For example, in this case, avoiding the exception will hide the fact that kafka can no longer find partitions/replicas of an existing topic.
        Hide
        Bob added a comment -

        Thanks for the info, this would be useful information in the 0.8 quickstart Since I am new to both Kafka and Zookeeper, I thought everything is self-contained within the kafka folder. I renamed /tmp/zookeeper/ folder on my first laptop and was able to get quickstart working without my changes.

        A suggestion though, I agree that this exception shouldn't be hidden, however, is there a way to do this without causing the infinite loop? It kept re-trying to elect the same failed broker over and over again. Thoughts?

        Show
        Bob added a comment - Thanks for the info, this would be useful information in the 0.8 quickstart Since I am new to both Kafka and Zookeeper, I thought everything is self-contained within the kafka folder. I renamed /tmp/zookeeper/ folder on my first laptop and was able to get quickstart working without my changes. A suggestion though, I agree that this exception shouldn't be hidden, however, is there a way to do this without causing the infinite loop? It kept re-trying to elect the same failed broker over and over again. Thoughts?
        Hide
        Neha Narkhede added a comment -

        Bob,

        You are right, we should detect that an 0.8 broker is trying to talk to a 0.7 zookeeper and throw a meaningful error. KAFKA-686 is filed to fix that issue. We can fix the infinite loop as part of that as well.

        Show
        Neha Narkhede added a comment - Bob, You are right, we should detect that an 0.8 broker is trying to talk to a 0.7 zookeeper and throw a meaningful error. KAFKA-686 is filed to fix that issue. We can fix the infinite loop as part of that as well.
        Hide
        Bob added a comment -

        Thanks, Neha, that'll be great. So, /tmp is used to store ephemeral nodes? You have probably consider this already but does it make sense to default the tmp folder to reside within each version so this type of conflict can be avoided in the future?

        Show
        Bob added a comment - Thanks, Neha, that'll be great. So, /tmp is used to store ephemeral nodes? You have probably consider this already but does it make sense to default the tmp folder to reside within each version so this type of conflict can be avoided in the future?
        Hide
        Neha Narkhede added a comment -

        That is upto the zookeeper deployment configuration. We can probably rename the zookeeper configuration in the 0.8 branch to /tmp/zookeeper-kafka-0.8 or something like that.

        Show
        Neha Narkhede added a comment - That is upto the zookeeper deployment configuration. We can probably rename the zookeeper configuration in the 0.8 branch to /tmp/zookeeper-kafka-0.8 or something like that.
        Hide
        Bob added a comment -

        Yeah, that'll work too, thanks.

        Show
        Bob added a comment - Yeah, that'll work too, thanks.

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            Bob
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development