From bd0461b7c03b231c85aa8611b0a9d1a970cf6365 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 21 Oct 2014 13:37:15 -0700 Subject: [PATCH 1/5] make mirror maker exit when one consumer/producer thread exits. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 39 +++++++++++++++-------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b8698ee..06a1711 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,26 +17,26 @@ package kafka.tools -import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} +import java.util.Random +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, TimeUnit} + +import joptsimple.OptionParser import kafka.consumer._ -import kafka.serializer._ -import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} import kafka.metrics.KafkaMetricsGroup - +import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer} +import kafka.serializer._ +import kafka.utils._ import org.apache.kafka.clients.producer.ProducerRecord import scala.collection.JavaConversions._ -import joptsimple.OptionParser -import java.util.Random -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} - object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null + private val isCleanShutdown: AtomicBoolean = new AtomicBoolean(false) private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) @@ -89,10 +89,10 @@ object MirrorMaker extends Logging { .ofType(classOf[String]) val blacklistOpt = parser.accepts("blacklist", - "Blacklist of topics to mirror.") - .withRequiredArg() - .describedAs("Java regex (String)") - .ofType(classOf[String]) + "Blacklist of topics to mirror.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(classOf[String]) val helpOpt = parser.accepts("help", "Print this message.") @@ -173,6 +173,7 @@ object MirrorMaker extends Logging { } def cleanShutdown() { + isCleanShutdown.set(true) if (connectors != null) connectors.foreach(_.shutdown) if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) if (producerThreads != null) { @@ -207,6 +208,10 @@ object MirrorMaker extends Logging { } else { Utils.abs(counter.getAndIncrement()) % numConsumers } + put(record, queueId) + } + + def put(record: ProducerRecord, queueId: Int) { val queue = queues(queueId) var putSucceed = false @@ -255,6 +260,9 @@ object MirrorMaker extends Logging { } finally { shutdownLatch.countDown() info("Consumer thread stopped") + // If it exits accidentally, stop the entire mirror maker. + if (isCleanShutdown.compareAndSet(false,true)) + System.exit(-1) } } @@ -296,13 +304,16 @@ object MirrorMaker extends Logging { } finally { shutdownComplete.countDown info("Producer thread stopped") + // If it exits accidentally, stop the entire mirror maker. + if (isCleanShutdown.compareAndSet(false,true)) + System.exit(-1) } } def shutdown { try { info("Producer thread " + threadName + " shutting down") - dataChannel.put(shutdownMessage) + dataChannel.put(shutdownMessage, threadId) } catch { case ie: InterruptedException => { -- 1.8.3.4 (Apple Git-47) From b7425f8c60081ac2a258dacded510c856f7bd985 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 22 Oct 2014 15:04:30 -0700 Subject: [PATCH 2/5] Addressed Guozhang's comments. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 06a1711..21aa3bb 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,11 +17,6 @@ package kafka.tools -import java.util.Random -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, TimeUnit} - -import joptsimple.OptionParser import kafka.consumer._ import kafka.metrics.KafkaMetricsGroup import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer} @@ -29,8 +24,14 @@ import kafka.serializer._ import kafka.utils._ import org.apache.kafka.clients.producer.ProducerRecord +import java.util.Random +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, TimeUnit} + import scala.collection.JavaConversions._ +import joptsimple.OptionParser + object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null @@ -261,8 +262,10 @@ object MirrorMaker extends Logging { shutdownLatch.countDown() info("Consumer thread stopped") // If it exits accidentally, stop the entire mirror maker. - if (isCleanShutdown.compareAndSet(false,true)) + if (isCleanShutdown.compareAndSet(false,true)) { + fatal("Consumer thread existed abnormally, stopping the whole mirror maker.") System.exit(-1) + } } } @@ -305,8 +308,10 @@ object MirrorMaker extends Logging { shutdownComplete.countDown info("Producer thread stopped") // If it exits accidentally, stop the entire mirror maker. - if (isCleanShutdown.compareAndSet(false,true)) + if (isCleanShutdown.compareAndSet(false,true)) { + fatal("Consumer thread existed abnormally, stopping the whole mirror maker.") System.exit(-1) + } } } -- 1.8.3.4 (Apple Git-47) From 12138d3cc94b7ca733850c6fabfede0933b0f8e0 Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 23 Oct 2014 16:18:21 -0700 Subject: [PATCH 3/5] Addressed Neha and Guzhang's comments. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 35 ++++++++++++++--------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 21aa3bb..6a7501e 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -37,7 +37,7 @@ object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null - private val isCleanShutdown: AtomicBoolean = new AtomicBoolean(false) + private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) @@ -174,7 +174,7 @@ object MirrorMaker extends Logging { } def cleanShutdown() { - isCleanShutdown.set(true) + isShuttingdown.set(true) if (connectors != null) connectors.foreach(_.shutdown) if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) if (producerThreads != null) { @@ -184,10 +184,10 @@ object MirrorMaker extends Logging { info("Kafka mirror maker shutdown successfully") } - class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup { + class DataChannel(capacity: Int, numInputs: Int, numOutputs: Int) extends KafkaMetricsGroup { - val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers) - for (i <- 0 until numConsumers) + val queues = new Array[BlockingQueue[ProducerRecord]](numOutputs) + for (i <- 0 until numOutputs) queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity) private val counter = new AtomicInteger(new Random().nextInt()) @@ -205,9 +205,9 @@ object MirrorMaker extends Logging { // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue val queueId = if(record.key() != null) { - Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers + Utils.abs(java.util.Arrays.hashCode(record.key())) % numOutputs } else { - Utils.abs(counter.getAndIncrement()) % numConsumers + Utils.abs(counter.getAndIncrement()) % numOutputs } put(record, queueId) } @@ -219,7 +219,7 @@ object MirrorMaker extends Logging { while (!putSucceed) { val startPutTime = SystemTime.nanoseconds putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) - waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) + waitPut.mark((SystemTime.nanoseconds - startPutTime) / numInputs) } channelSizeHist.update(queue.size) } @@ -230,7 +230,7 @@ object MirrorMaker extends Logging { while (data == null) { val startTakeTime = SystemTime.nanoseconds data = queue.poll(500, TimeUnit.MILLISECONDS) - waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) + waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numOutputs) } channelSizeHist.update(queue.size) data @@ -244,6 +244,7 @@ object MirrorMaker extends Logging { private val shutdownLatch = new CountDownLatch(1) private val threadName = "mirrormaker-consumer-" + threadId + private var isCleanShutdown: Boolean = true this.logIdent = "[%s] ".format(threadName) this.setName(threadName) @@ -256,15 +257,18 @@ object MirrorMaker extends Logging { mirrorDataChannel.put(data) } } catch { - case e: Throwable => + case e: Throwable => { fatal("Stream unexpectedly exited.", e) + isCleanShutdown = false + } } finally { shutdownLatch.countDown() info("Consumer thread stopped") // If it exits accidentally, stop the entire mirror maker. - if (isCleanShutdown.compareAndSet(false,true)) { + if (!isCleanShutdown) { fatal("Consumer thread existed abnormally, stopping the whole mirror maker.") - System.exit(-1) + if (isShuttingdown.get() == false) + System.exit(-1) } } } @@ -284,6 +288,7 @@ object MirrorMaker extends Logging { val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-producer-" + threadId private val shutdownComplete: CountDownLatch = new CountDownLatch(1) + private var isCleanShutdown: Boolean = true this.logIdent = "[%s] ".format(threadName) setName(threadName) @@ -303,13 +308,15 @@ object MirrorMaker extends Logging { } catch { case t: Throwable => { fatal("Producer thread failure due to ", t) + isCleanShutdown = false } } finally { shutdownComplete.countDown info("Producer thread stopped") // If it exits accidentally, stop the entire mirror maker. - if (isCleanShutdown.compareAndSet(false,true)) { - fatal("Consumer thread existed abnormally, stopping the whole mirror maker.") + if (!isCleanShutdown) { + if (isShuttingdown.get() == false) + fatal("Consumer thread existed abnormally, stopping the whole mirror maker.") System.exit(-1) } } -- 1.8.3.4 (Apple Git-47) From 1436bc8477c19650e55766a3f8cc8d768491ce50 Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 24 Oct 2014 00:38:52 -0700 Subject: [PATCH 4/5] Incorporated Joel and Neha's comments. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 6a7501e..3ed66af 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -266,8 +266,8 @@ object MirrorMaker extends Logging { info("Consumer thread stopped") // If it exits accidentally, stop the entire mirror maker. if (!isCleanShutdown) { - fatal("Consumer thread existed abnormally, stopping the whole mirror maker.") - if (isShuttingdown.get() == false) + fatal("Consumer thread exited abnormally, stopping the whole mirror maker.") + if (!isShuttingdown.get()) System.exit(-1) } } @@ -315,9 +315,9 @@ object MirrorMaker extends Logging { info("Producer thread stopped") // If it exits accidentally, stop the entire mirror maker. if (!isCleanShutdown) { - if (isShuttingdown.get() == false) - fatal("Consumer thread existed abnormally, stopping the whole mirror maker.") - System.exit(-1) + fatal("Producer thread exited abnormally, stopping the whole mirror maker.") + if (!isShuttingdown.get()) + System.exit(-1) } } } -- 1.8.3.4 (Apple Git-47) From 934e7da0ff6a832325a8f1b7775969b63d7c4415 Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 24 Oct 2014 00:55:27 -0700 Subject: [PATCH 5/5] Incorporated Joel and Neha's comments. Also fixed a potential race where cleanShutdown could execute multiple times if several threads exit abnormally at same time. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 3ed66af..f399105 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -174,14 +174,15 @@ object MirrorMaker extends Logging { } def cleanShutdown() { - isShuttingdown.set(true) - if (connectors != null) connectors.foreach(_.shutdown) - if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) - if (producerThreads != null) { - producerThreads.foreach(_.shutdown) - producerThreads.foreach(_.awaitShutdown) + if (isShuttingdown.compareAndSet(false, true)) { + if (connectors != null) connectors.foreach(_.shutdown) + if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + if (producerThreads != null) { + producerThreads.foreach(_.shutdown) + producerThreads.foreach(_.awaitShutdown) + } + info("Kafka mirror maker shutdown successfully") } - info("Kafka mirror maker shutdown successfully") } class DataChannel(capacity: Int, numInputs: Int, numOutputs: Int) extends KafkaMetricsGroup { @@ -267,8 +268,7 @@ object MirrorMaker extends Logging { // If it exits accidentally, stop the entire mirror maker. if (!isCleanShutdown) { fatal("Consumer thread exited abnormally, stopping the whole mirror maker.") - if (!isShuttingdown.get()) - System.exit(-1) + System.exit(-1) } } } @@ -316,8 +316,7 @@ object MirrorMaker extends Logging { // If it exits accidentally, stop the entire mirror maker. if (!isCleanShutdown) { fatal("Producer thread exited abnormally, stopping the whole mirror maker.") - if (!isShuttingdown.get()) - System.exit(-1) + System.exit(-1) } } } -- 1.8.3.4 (Apple Git-47)