Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1415

Async producer.send can block forever if async.ProducerSendThread dies

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Auto Closed
    • 0.8.0
    • None
    • clients, producer
    • None
    • kafka_2.9.2-0.8.0.jar

    Description

      We noticed that if something goes fundamentally wrong (in this case the jars were replaced under a running Producer's feet) then async calls to: producer.send can lockup forever.

      I saw in the log file the following exception logged:

      [2014-04-17 16:45:36,484] INFO Disconnecting from cn2:9092 (kafka.producer.SyncProducer)
      Exception in thread "ProducerSendThread-" java.lang.NoClassDefFoundError: kafka/producer/async/ProducerSendThread$$anonfun$run$1
              at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:46)
      Caused by: java.lang.ClassNotFoundException: kafka.producer.async.ProducerSendThread$$anonfun$run$1
              at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
              at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
              at java.security.AccessController.doPrivileged(Native Method)
              at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
              at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
              at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
              at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
              ... 1 more
      

      However my application continued running. Jstack showed that the producer.send calls had all locked up:

      "SubscriberEventQueue0Executor-1" prio=10 tid=0x00002aaab0a88000 nid=0x44f5 waiting on condition [0x0000000044ac4000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x0000000790c47918> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
      	at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:98)
      	at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:90)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
      	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:33)
      	at kafka.producer.Producer.asyncSend(Producer.scala:90)
      	at kafka.producer.Producer.send(Producer.scala:77)
      	- locked <0x0000000791768ee8> (a java.lang.Object)
      	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      	at com.mi.ahl.kafka.rmds2kafka.Bridge$1.onMarketData(Bridge.java:165)
      
         Locked ownable synchronizers:
      	- <0x0000000792205cd0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
      
      "SubscriberEventQueue1Executor-2" prio=10 tid=0x00002aaab0aa0000 nid=0x4511 waiting for monitor entry [0x0000000044dc7000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at kafka.producer.Producer.send(Producer.scala:71)
      	- waiting to lock <0x0000000791768ee8> (a java.lang.Object)
      	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      	at com.mi.ahl.kafka.rmds2kafka.Bridge$1.onMarketData(Bridge.java:165)
      
      "SubscriberEventQueue2Executor-3" prio=10 tid=0x00002aaab0ab6800 nid=0x4512 waiting for monitor entry [0x0000000044ec8000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at kafka.producer.Producer.send(Producer.scala:71)
      	- waiting to lock <0x0000000791768ee8> (a java.lang.Object)
      	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      	at com.mi.ahl.kafka.rmds2kafka.Bridge$1.onMarketData(Bridge.java:165)
      
      "SubscriberEventQueue3Executor-4" prio=10 tid=0x00002aaab0ab8800 nid=0x4513 waiting for monitor entry [0x0000000044fc9000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at kafka.producer.Producer.send(Producer.scala:71)
      	- waiting to lock <0x0000000791768ee8> (a java.lang.Object)
      	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      	at com.mi.ahl.kafka.rmds2kafka.Bridge$1.onMarketData(Bridge.java:165)
      

      Expectation:
      producer.send would raise if something had fundamentally broken within the client rather than hanging forever.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            junrao Jun Rao
            jblackburn James Blackburn
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment