Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-145

Kafka server mirror shutdown bug

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7
    • Component/s: core
    • Labels:
      None

      Description

      When a machine that is mirroring data off of another Kafka broker is shutdown, it runs into the following exception, effectively dropping data. The shutdown API needs to be fixed to first shutdown the consumer threads, drain all the data to the producer, and only then shutdown the producer.

      FATAL kafka.server.EmbeddedConsumer - kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.
      at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:87)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
      at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      at scala.collection.immutable.List.foreach(List.scala:45)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
      at kafka.producer.Producer.zkSend(Producer.scala:144)
      at kafka.producer.Producer.send(Producer.scala:106)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)

        Attachments

        1. KAFKA-145.patch
          5 kB
          Neha Narkhede
        2. KAFKA-145.patch
          5 kB
          Neha Narkhede

          Activity

            People

            • Assignee:
              nehanarkhede Neha Narkhede
              Reporter:
              nehanarkhede Neha Narkhede
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: