Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1415

Async producer.send can block forever if async.ProducerSendThread dies

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Auto Closed
    • 0.8.0
    • None
    • clients, producer
    • None
    • kafka_2.9.2-0.8.0.jar

    Description

      We noticed that if something goes fundamentally wrong (in this case the jars were replaced under a running Producer's feet) then async calls to: producer.send can lockup forever.

      I saw in the log file the following exception logged:

      [2014-04-17 16:45:36,484] INFO Disconnecting from cn2:9092 (kafka.producer.SyncProducer)
      Exception in thread "ProducerSendThread-" java.lang.NoClassDefFoundError: kafka/producer/async/ProducerSendThread$$anonfun$run$1
              at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:46)
      Caused by: java.lang.ClassNotFoundException: kafka.producer.async.ProducerSendThread$$anonfun$run$1
              at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
              at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
              at java.security.AccessController.doPrivileged(Native Method)
              at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
              at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
              at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
              at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
              ... 1 more
      

      However my application continued running. Jstack showed that the producer.send calls had all locked up:

      "SubscriberEventQueue0Executor-1" prio=10 tid=0x00002aaab0a88000 nid=0x44f5 waiting on condition [0x0000000044ac4000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x0000000790c47918> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
      	at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:98)
      	at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:90)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
      	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:33)
      	at kafka.producer.Producer.asyncSend(Producer.scala:90)
      	at kafka.producer.Producer.send(Producer.scala:77)
      	- locked <0x0000000791768ee8> (a java.lang.Object)
      	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      	at com.mi.ahl.kafka.rmds2kafka.Bridge$1.onMarketData(Bridge.java:165)
      
         Locked ownable synchronizers:
      	- <0x0000000792205cd0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
      
      "SubscriberEventQueue1Executor-2" prio=10 tid=0x00002aaab0aa0000 nid=0x4511 waiting for monitor entry [0x0000000044dc7000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at kafka.producer.Producer.send(Producer.scala:71)
      	- waiting to lock <0x0000000791768ee8> (a java.lang.Object)
      	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      	at com.mi.ahl.kafka.rmds2kafka.Bridge$1.onMarketData(Bridge.java:165)
      
      "SubscriberEventQueue2Executor-3" prio=10 tid=0x00002aaab0ab6800 nid=0x4512 waiting for monitor entry [0x0000000044ec8000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at kafka.producer.Producer.send(Producer.scala:71)
      	- waiting to lock <0x0000000791768ee8> (a java.lang.Object)
      	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      	at com.mi.ahl.kafka.rmds2kafka.Bridge$1.onMarketData(Bridge.java:165)
      
      "SubscriberEventQueue3Executor-4" prio=10 tid=0x00002aaab0ab8800 nid=0x4513 waiting for monitor entry [0x0000000044fc9000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at kafka.producer.Producer.send(Producer.scala:71)
      	- waiting to lock <0x0000000791768ee8> (a java.lang.Object)
      	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      	at com.mi.ahl.kafka.rmds2kafka.Bridge$1.onMarketData(Bridge.java:165)
      

      Expectation:
      producer.send would raise if something had fundamentally broken within the client rather than hanging forever.

      Attachments

        Activity

          People

            junrao Jun Rao
            jblackburn James Blackburn
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: