Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1764

ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue.

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.2.0
    • Component/s: None
    • Labels:
      None

      Description

      In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues.

      From email thread to document:

      In ZookeeperConsumerConnector shutdown(), we could potentially put
      multiple shutdownCommand into the same data chunk queue, provided the
      topics are sharing the same data chunk queue in topicThreadIdAndQueues.

      In our case, we only have 1 consumer stream for all the topics, the data
      chunk queue capacity is set to 1. The execution sequence causing problem is
      as below:
      1. ZookeeperConsumerConnector shutdown() is called, it tries to put
      shutdownCommand for each queue in topicThreadIdAndQueues. Since we only
      have 1 queue, multiple shutdownCommand will be put into the queue.
      2. In sendShutdownToAllQueues(), between queue.clean() and
      queue.put(shutdownCommand), consumer iterator receives the shutdownCommand
      and put it back into the data chunk queue. After that,
      ZookeeperConsumerConnector tries to put another shutdownCommand into the
      data chunk queue but will block forever.

      The thread stack trace is as below:

      "Thread-23" #58 prio=5 os_prio=0 tid=0x00007ff440004800 nid=0x40a waiting
      on condition [0x00007ff4f0124000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x0000000680b96bf0> (a
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
              at
      java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
              at
      kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262)
              at
      kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259)
              at scala.collection.Iterator$class.foreach(Iterator.scala:727)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
              at
      scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
              at
      kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259)
              at
      kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199)
              at
      kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192)
              - locked <0x0000000680dd5848> (a java.lang.Object)
              at
      kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
              at
      kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
              at scala.collection.immutable.List.foreach(List.scala:318)
              at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185)
      

        Attachments

        1. KAFKA-1764_2014-11-13_23:57:51.patch
          3 kB
          Jiangjie Qin
        2. KAFKA-1764_2014-11-12_14:05:35.patch
          2 kB
          Jiangjie Qin
        3. KAFKA-1764.patch
          1 kB
          Jiangjie Qin

          Activity

            People

            • Assignee:
              becket_qin Jiangjie Qin
              Reporter:
              becket_qin Jiangjie Qin
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: