Kafka
  1. Kafka
  2. KAFKA-145

Kafka server mirror shutdown bug

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7
    • Component/s: core
    • Labels:
      None

      Description

      When a machine that is mirroring data off of another Kafka broker is shutdown, it runs into the following exception, effectively dropping data. The shutdown API needs to be fixed to first shutdown the consumer threads, drain all the data to the producer, and only then shutdown the producer.

      FATAL kafka.server.EmbeddedConsumer - kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.
      at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:87)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
      at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      at scala.collection.immutable.List.foreach(List.scala:45)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
      at kafka.producer.Producer.zkSend(Producer.scala:144)
      at kafka.producer.Producer.send(Producer.scala:106)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)

      1. KAFKA-145.patch
        5 kB
        Neha Narkhede
      2. KAFKA-145.patch
        5 kB
        Neha Narkhede

        Activity

        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Neha Narkhede made changes -
        Attachment KAFKA-145.patch [ 12497912 ]
        Neha Narkhede made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Fix Version/s 0.7 [ 12317243 ]
        Neha Narkhede made changes -
        Attachment KAFKA-145.patch [ 12497895 ]
        Neha Narkhede made changes -
        Field Original Value New Value
        Assignee Neha Narkhede [ nehanarkhede ]
        Neha Narkhede created issue -

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development