Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1595

Remove deprecated and slower scala JSON parser

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.1.1
    • Fix Version/s: 1.0.0
    • Component/s: None
    • Labels:

      Description

      The following issue is created as a follow up suggested by Jun Rao
      in a kafka news group message with the Subject

      "Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount"

      SUMMARY:
      An issue was detected in a typical cluster of 3 kafka instances backed
      by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
      java version 1.7.0_65). On consumer end, when consumers get recycled,
      there is a troubling JSON parsing recursion which takes a busy lock and
      blocks consumers thread pool.

      In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
      a global lock (0x00000000d3a7e1d0) during the rebalance, and fires an
      expensive JSON parsing, while keeping the other consumers from shutting
      down, see, e.g,

      at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)

      The deep recursive JSON parsing should be deprecated in favor
      of a better JSON parser, see, e.g,
      http://engineering.ooyala.com/blog/comparing-scala-json-libraries?

      DETAILS:
      The first dump is for a recursive blocking thread holding the lock for 0x00000000d3a7e1d0
      and the subsequent dump is for a waiting thread.

      (Please grep for 0x00000000d3a7e1d0 to see the locked object.)
      Â

      ------------------------8<----------------------------------------
      "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
      prio=10 tid=0x00007f24dc285800 nid=0xda9 runnable [0x00007f249e40b000]
      java.lang.Thread.State: RUNNABLE
      at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
      at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
      at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
      at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.lexical.Scanners$Scanner.<init>(Scanners.scala:49)
      at scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:60)
      at scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:44)
      at scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:608)
      at scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:606)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:736)
      at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
      at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
      at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
      at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
      at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:54)
      at scala.util.parsing.json.JSON$.parseFull(JSON.scala:68)
      at kafka.utils.Json$.liftedTree1$1(Json.scala:37)
      at kafka.utils.Json$.parseFull(Json.scala:36)

      • locked <0x00000000c5a7cdd8> (a java.lang.Object)
        at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:56)
        at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:678)
        at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:677)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.utils.ZkUtils$.getConsumersPerTopic(ZkUtils.scala:677)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:437)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402)
      • locked <0x00000000d3a7e1d0> (a java.lang.Object)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
        --------------------------------------------------------------------

      Many BLOCKED Threads had stack trace similar to the following stack waiting to lock 0x00000000d3a7e1d0

      ------------------------8<----------------------------------------
      "application-my-context-105" prio=10 tid=0x00007f24e45aa800 nid=0x3494 waiting for monitor entry [0x00007f24afe26000]
      java.lang.Thread.State: BLOCKED (on object monitor)
      at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)

      • waiting to lock <0x00000000d3a7e1d0> (a java.lang.Object)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
        at com.custom1.Consumer.cancel(Consumer.java:312)
        at com.custom1.Consumer.run(Consumer.java:302)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        -------------------------------------------------------------------

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                ijuma Ismael Juma
                Reporter:
                jhooda Jagbir
                Reviewer:
                Rajini Sivaram
              • Votes:
                0 Vote for this issue
                Watchers:
                11 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: