Description
When a machine that is mirroring data off of another Kafka broker is shutdown, it runs into the following exception, effectively dropping data. The shutdown API needs to be fixed to first shutdown the consumer threads, drain all the data to the producer, and only then shutdown the producer.
FATAL kafka.server.EmbeddedConsumer - kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.kafka.producer.async.QueueClosedException: Attempt to add event to a closed queue.
at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:87)
at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
at kafka.producer.Producer.zkSend(Producer.scala:144)
at kafka.producer.Producer.send(Producer.scala:106)
at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)