From 7f027ec53759398b111696698786dadef79ff858 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 21 Oct 2014 13:37:15 -0700 Subject: [PATCH] make mirror maker exit when one consumer/producer thread exits. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 39 +++++++++++++++-------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b8698ee..06a1711 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,26 +17,26 @@ package kafka.tools -import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} +import java.util.Random +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, TimeUnit} + +import joptsimple.OptionParser import kafka.consumer._ -import kafka.serializer._ -import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} import kafka.metrics.KafkaMetricsGroup - +import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer} +import kafka.serializer._ +import kafka.utils._ import org.apache.kafka.clients.producer.ProducerRecord import scala.collection.JavaConversions._ -import joptsimple.OptionParser -import java.util.Random -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} - object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null + private val isCleanShutdown: AtomicBoolean = new AtomicBoolean(false) private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) @@ -89,10 +89,10 @@ object MirrorMaker extends Logging { .ofType(classOf[String]) val blacklistOpt = parser.accepts("blacklist", - "Blacklist of topics to mirror.") - .withRequiredArg() - .describedAs("Java regex (String)") - .ofType(classOf[String]) + "Blacklist of topics to mirror.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(classOf[String]) val helpOpt = parser.accepts("help", "Print this message.") @@ -173,6 +173,7 @@ object MirrorMaker extends Logging { } def cleanShutdown() { + isCleanShutdown.set(true) if (connectors != null) connectors.foreach(_.shutdown) if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) if (producerThreads != null) { @@ -207,6 +208,10 @@ object MirrorMaker extends Logging { } else { Utils.abs(counter.getAndIncrement()) % numConsumers } + put(record, queueId) + } + + def put(record: ProducerRecord, queueId: Int) { val queue = queues(queueId) var putSucceed = false @@ -255,6 +260,9 @@ object MirrorMaker extends Logging { } finally { shutdownLatch.countDown() info("Consumer thread stopped") + // If it exits accidentally, stop the entire mirror maker. + if (isCleanShutdown.compareAndSet(false,true)) + System.exit(-1) } } @@ -296,13 +304,16 @@ object MirrorMaker extends Logging { } finally { shutdownComplete.countDown info("Producer thread stopped") + // If it exits accidentally, stop the entire mirror maker. + if (isCleanShutdown.compareAndSet(false,true)) + System.exit(-1) } } def shutdown { try { info("Producer thread " + threadName + " shutting down") - dataChannel.put(shutdownMessage) + dataChannel.put(shutdownMessage, threadId) } catch { case ie: InterruptedException => { -- 1.8.3.4 (Apple Git-47)