Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-145

Kafka server mirror shutdown bug

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.7
    • core
    • None

    Description

      When a machine that is mirroring data off of another Kafka broker is shutdown, it runs into the following exception, effectively dropping data. The shutdown API needs to be fixed to first shutdown the consumer threads, drain all the data to the producer, and only then shutdown the producer.

      FATAL kafka.server.EmbeddedConsumer - kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.
      at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:87)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
      at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      at scala.collection.immutable.List.foreach(List.scala:45)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
      at kafka.producer.Producer.zkSend(Producer.scala:144)
      at kafka.producer.Producer.send(Producer.scala:106)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)

      Attachments

        1. KAFKA-145.patch
          5 kB
          Neha Narkhede
        2. KAFKA-145.patch
          5 kB
          Neha Narkhede

        Activity

          People

            nehanarkhede Neha Narkhede
            nehanarkhede Neha Narkhede
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: