Kafka
  1. Kafka
  2. KAFKA-379

TopicCount.constructTopicCount isn't thread-safe

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7, 0.8.0
    • Fix Version/s: 0.7, 0.8.0
    • Component/s: clients
    • Labels:

      Description

      TopicCount uses scala.util.parsing.json.JSON, which isn't thread-safe https://issues.scala-lang.org/browse/SI-4929

      If you have multiple consumers within the same JVM, and they all rebalance at the same time, you can get errors like the following:

      [...] kafka.consumer.TopicCount$.constructTopicCount:39] ERROR: error parsing consumer json string [...]
      java.lang.NullPointerException
      at scala.util.parsing.combinator.Parsers$NoSuccess.<init>(Parsers.scala:131)
      at scala.util.parsing.combinator.Parsers$Failure.<init>(Parsers.scala:158)
      at scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:489)
      ...
      at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:742)
      at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:71)
      at scala.util.parsing.json.JSON$.parseFull(JSON.scala:85)
      at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:32)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:422)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:460)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:437)
      at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
      at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:433)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:375)
      at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
      at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

      I ran into this on 0.7.0, but the code in trunk appears to be vulnerable to the same issue.

      1. TopicCount.scala.diff
        2 kB
        Nick Howard
      2. kafka-379_0.8_v1.patch
        8 kB
        Jun Rao

        Activity

        Hide
        Jun Rao added a comment -

        Patched and committed to trunk too.

        Show
        Jun Rao added a comment - Patched and committed to trunk too.
        Hide
        Jun Rao added a comment -

        Evan,

        The 0.6 release is pre Apache. So, we won't be able to patch it. Sorry.

        Show
        Jun Rao added a comment - Evan, The 0.6 release is pre Apache. So, we won't be able to patch it. Sorry.
        Hide
        Evan Chan added a comment -

        Do you guys want a patch for 0.6? I just created one and can attach it. I know, we should really upgrade to 0.7.

        Show
        Evan Chan added a comment - Do you guys want a patch for 0.6? I just created one and can attach it. I know, we should really upgrade to 0.7.
        Hide
        Jun Rao added a comment -

        Thanks for the review. Rebased and committed to 0.8. Will port to 0.7.

        Show
        Jun Rao added a comment - Thanks for the review. Rebased and committed to 0.8. Will port to 0.7.
        Hide
        Joel Koshy added a comment -

        Forgot to add - can we get this patched on trunk as well?

        Show
        Joel Koshy added a comment - Forgot to add - can we get this patched on trunk as well?
        Hide
        Neha Narkhede added a comment -

        +1. Looks good.

        Show
        Neha Narkhede added a comment - +1. Looks good.
        Hide
        Joel Koshy added a comment -

        +1

        Show
        Joel Koshy added a comment - +1
        Hide
        Jun Rao added a comment -

        Attach patch v1 in 0.8. Just created a synchronized singleton that wraps scala JSON. Since json parsing is used rarely in Kafka, this is likely not a performance concern.

        Show
        Jun Rao added a comment - Attach patch v1 in 0.8. Just created a synchronized singleton that wraps scala JSON. Since json parsing is used rarely in Kafka, this is likely not a performance concern.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. In 0.7, the only usage of JSON is in TopicCount and all usages are called from ZookeeperConsumerConnector.rebalance, which is synchronized. So, it seems that we just need to create a new JSONParser instance per ZookeeperConsumerConnctor instance. Creating 1 JSONParser instance each time a parser is needed may be too expensive.

        Show
        Jun Rao added a comment - Thanks for the patch. In 0.7, the only usage of JSON is in TopicCount and all usages are called from ZookeeperConsumerConnector.rebalance, which is synchronized. So, it seems that we just need to create a new JSONParser instance per ZookeeperConsumerConnctor instance. Creating 1 JSONParser instance each time a parser is needed may be too expensive.
        Hide
        Joel Koshy added a comment -

        Thanks for reporting this - and the SI reference.

        This likely affects 0.8 in other places as well:
        git grep parseFull
        core/src/main/scala/kafka/consumer/TopicCount.scala: JSON.parseFull(topicCountString) match {
        core/src/main/scala/kafka/server/StateChangeCommand.scala: JSON.parseFull(requestJson) match {
        core/src/main/scala/kafka/utils/ZkUtils.scala: JSON.parseFull(jsonPartitionMap) match {

        Show
        Joel Koshy added a comment - Thanks for reporting this - and the SI reference. This likely affects 0.8 in other places as well: git grep parseFull core/src/main/scala/kafka/consumer/TopicCount.scala: JSON.parseFull(topicCountString) match { core/src/main/scala/kafka/server/StateChangeCommand.scala: JSON.parseFull(requestJson) match { core/src/main/scala/kafka/utils/ZkUtils.scala: JSON.parseFull(jsonPartitionMap) match {
        Hide
        Nick Howard added a comment -

        I patched our fork like this.

        The patch takes the parsing code from scala.util.parsing.json.JSON and puts it in a class instead of an object, so it's instantiable.

        Show
        Nick Howard added a comment - I patched our fork like this. The patch takes the parsing code from scala.util.parsing.json.JSON and puts it in a class instead of an object, so it's instantiable.

          People

          • Assignee:
            Jun Rao
            Reporter:
            Nick Howard
          • Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development