Kafka
  1. Kafka
  2. KAFKA-379

TopicCount.constructTopicCount isn't thread-safe

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7, 0.8.0
    • Fix Version/s: 0.7, 0.8.0
    • Component/s: clients
    • Labels:

      Description

      TopicCount uses scala.util.parsing.json.JSON, which isn't thread-safe https://issues.scala-lang.org/browse/SI-4929

      If you have multiple consumers within the same JVM, and they all rebalance at the same time, you can get errors like the following:

      [...] kafka.consumer.TopicCount$.constructTopicCount:39] ERROR: error parsing consumer json string [...]
      java.lang.NullPointerException
      at scala.util.parsing.combinator.Parsers$NoSuccess.<init>(Parsers.scala:131)
      at scala.util.parsing.combinator.Parsers$Failure.<init>(Parsers.scala:158)
      at scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:489)
      ...
      at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:742)
      at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:71)
      at scala.util.parsing.json.JSON$.parseFull(JSON.scala:85)
      at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:32)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:422)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:460)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:437)
      at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
      at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:433)
      at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:375)
      at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
      at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

      I ran into this on 0.7.0, but the code in trunk appears to be vulnerable to the same issue.

      1. kafka-379_0.8_v1.patch
        8 kB
        Jun Rao
      2. TopicCount.scala.diff
        2 kB
        Nick Howard

        Activity

        Nick Howard created issue -
        Hide
        Nick Howard added a comment -

        I patched our fork like this.

        The patch takes the parsing code from scala.util.parsing.json.JSON and puts it in a class instead of an object, so it's instantiable.

        Show
        Nick Howard added a comment - I patched our fork like this. The patch takes the parsing code from scala.util.parsing.json.JSON and puts it in a class instead of an object, so it's instantiable.
        Nick Howard made changes -
        Field Original Value New Value
        Attachment TopicCount.scala.diff [ 12533891 ]
        Hide
        Joel Koshy added a comment -

        Thanks for reporting this - and the SI reference.

        This likely affects 0.8 in other places as well:
        git grep parseFull
        core/src/main/scala/kafka/consumer/TopicCount.scala: JSON.parseFull(topicCountString) match {
        core/src/main/scala/kafka/server/StateChangeCommand.scala: JSON.parseFull(requestJson) match {
        core/src/main/scala/kafka/utils/ZkUtils.scala: JSON.parseFull(jsonPartitionMap) match {

        Show
        Joel Koshy added a comment - Thanks for reporting this - and the SI reference. This likely affects 0.8 in other places as well: git grep parseFull core/src/main/scala/kafka/consumer/TopicCount.scala: JSON.parseFull(topicCountString) match { core/src/main/scala/kafka/server/StateChangeCommand.scala: JSON.parseFull(requestJson) match { core/src/main/scala/kafka/utils/ZkUtils.scala: JSON.parseFull(jsonPartitionMap) match {
        Hide
        Jun Rao added a comment -

        Thanks for the patch. In 0.7, the only usage of JSON is in TopicCount and all usages are called from ZookeeperConsumerConnector.rebalance, which is synchronized. So, it seems that we just need to create a new JSONParser instance per ZookeeperConsumerConnctor instance. Creating 1 JSONParser instance each time a parser is needed may be too expensive.

        Show
        Jun Rao added a comment - Thanks for the patch. In 0.7, the only usage of JSON is in TopicCount and all usages are called from ZookeeperConsumerConnector.rebalance, which is synchronized. So, it seems that we just need to create a new JSONParser instance per ZookeeperConsumerConnctor instance. Creating 1 JSONParser instance each time a parser is needed may be too expensive.
        Hide
        Jun Rao added a comment -

        Attach patch v1 in 0.8. Just created a synchronized singleton that wraps scala JSON. Since json parsing is used rarely in Kafka, this is likely not a performance concern.

        Show
        Jun Rao added a comment - Attach patch v1 in 0.8. Just created a synchronized singleton that wraps scala JSON. Since json parsing is used rarely in Kafka, this is likely not a performance concern.
        Jun Rao made changes -
        Attachment kafka-379_0.8_v1.patch [ 12538804 ]
        Hide
        Joel Koshy added a comment -

        +1

        Show
        Joel Koshy added a comment - +1
        Hide
        Neha Narkhede added a comment -

        +1. Looks good.

        Show
        Neha Narkhede added a comment - +1. Looks good.
        Hide
        Joel Koshy added a comment -

        Forgot to add - can we get this patched on trunk as well?

        Show
        Joel Koshy added a comment - Forgot to add - can we get this patched on trunk as well?
        Joel Koshy made changes -
        Labels bugs
        Hide
        Jun Rao added a comment -

        Thanks for the review. Rebased and committed to 0.8. Will port to 0.7.

        Show
        Jun Rao added a comment - Thanks for the review. Rebased and committed to 0.8. Will port to 0.7.
        Jun Rao made changes -
        Assignee Jun Rao [ junrao ]
        Hide
        Evan Chan added a comment -

        Do you guys want a patch for 0.6? I just created one and can attach it. I know, we should really upgrade to 0.7.

        Show
        Evan Chan added a comment - Do you guys want a patch for 0.6? I just created one and can attach it. I know, we should really upgrade to 0.7.
        Hide
        Jun Rao added a comment -

        Evan,

        The 0.6 release is pre Apache. So, we won't be able to patch it. Sorry.

        Show
        Jun Rao added a comment - Evan, The 0.6 release is pre Apache. So, we won't be able to patch it. Sorry.
        Jun Rao made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Jun Rao added a comment -

        Patched and committed to trunk too.

        Show
        Jun Rao added a comment - Patched and committed to trunk too.
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Fix Version/s 0.8 [ 12317244 ]
        Fix Version/s 0.7 [ 12317243 ]
        Resolution Fixed [ 1 ]
        Jun Rao made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Patch Available Patch Available
        81d 8h 4m 1 Jun Rao 18/Sep/12 06:05
        Patch Available Patch Available Resolved Resolved
        32s 1 Jun Rao 18/Sep/12 06:06
        Resolved Resolved Closed Closed
        6s 1 Jun Rao 18/Sep/12 06:06

          People

          • Assignee:
            Jun Rao
            Reporter:
            Nick Howard
          • Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development