Kafka
  1. Kafka
  2. KAFKA-145

Kafka server mirror shutdown bug

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7
    • Component/s: core
    • Labels:
      None

      Description

      When a machine that is mirroring data off of another Kafka broker is shutdown, it runs into the following exception, effectively dropping data. The shutdown API needs to be fixed to first shutdown the consumer threads, drain all the data to the producer, and only then shutdown the producer.

      FATAL kafka.server.EmbeddedConsumer - kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.
      at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:87)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
      at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      at scala.collection.immutable.List.foreach(List.scala:45)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
      at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
      at kafka.producer.Producer.zkSend(Producer.scala:144)
      at kafka.producer.Producer.send(Producer.scala:106)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)

      1. KAFKA-145.patch
        5 kB
        Neha Narkhede
      2. KAFKA-145.patch
        5 kB
        Neha Narkhede

        Activity

        Hide
        Neha Narkhede added a comment -

        This patch corrects the shutdown behavior of Kafka mirroring, i.e. EmbeddedConsumer. It first shuts down the new topic watcher, then the zookeeper consumer connector. After this, it stops the mirroring threads. At this point, all mirroring threads have finished mirroring all data that they've ever consumed. Only then, it shuts down the producer.

        This ensures that the producer is not shutdown, before the mirroring threads have finished their work, thereby avoiding data loss caused due to QueueClosedException.

        Show
        Neha Narkhede added a comment - This patch corrects the shutdown behavior of Kafka mirroring, i.e. EmbeddedConsumer. It first shuts down the new topic watcher, then the zookeeper consumer connector. After this, it stops the mirroring threads. At this point, all mirroring threads have finished mirroring all data that they've ever consumed. Only then, it shuts down the producer. This ensures that the producer is not shutdown, before the mirroring threads have finished their work, thereby avoiding data loss caused due to QueueClosedException.
        Hide
        Jun Rao added a comment -

        In MirroringThread.run, it's probably better to do the countdown even when we hit an exception.

        Show
        Jun Rao added a comment - In MirroringThread.run, it's probably better to do the countdown even when we hit an exception.
        Hide
        Neha Narkhede added a comment -

        Uploading an updated patch, in which the shutdown latch is decremented in a finally block, to make sure the mirroring threads will shutdown even when they run into an error/exception.

        Show
        Neha Narkhede added a comment - Uploading an updated patch, in which the shutdown latch is decremented in a finally block, to make sure the mirroring threads will shutdown even when they run into an error/exception.
        Hide
        Jun Rao added a comment -

        +1 on the new patch.

        Show
        Jun Rao added a comment - +1 on the new patch.
        Hide
        Neha Narkhede added a comment -

        Thanks. Just committed the patch.

        Show
        Neha Narkhede added a comment - Thanks. Just committed the patch.

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development