diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 12fa797..90781d2 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -36,6 +36,8 @@ object MirrorMaker extends Logging { private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: ListBuffer[ProducerThread] = null + private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) + def main(args: Array[String]) { info ("Starting mirror maker") @@ -159,9 +161,11 @@ object MirrorMaker extends Logging { consumerThreads.foreach(_.start) producerThreads.foreach(_.start) - // in case the consumer threads hit a timeout/other exception - consumerThreads.foreach(_.awaitShutdown) - cleanShutdown() + // we wait on producer's shutdown latch instead of consumers + // since the consumer threads can hit a timeout/other exception; + // but in this case the producer should still be able to shutdown + // based on the shutdown message in the channel + producerThreads.foreach(_.awaitShutdown) } def cleanShutdown() { @@ -187,7 +191,7 @@ object MirrorMaker extends Logging { this.setName(threadName) override def run() { - info("Starting mirror maker thread " + threadName) + info("Starting mirror maker consumer thread " + threadName) try { for (msgAndMetadata <- stream) { // If the key of the message is empty, put it into the universal channel @@ -208,59 +212,60 @@ object MirrorMaker extends Logging { fatal("Stream unexpectedly exited.", e) } finally { shutdownLatch.countDown() - info("Stopped thread.") + info("Consumer thread stopped") } } def awaitShutdown() { try { shutdownLatch.await() + info("Consumer thread shutdown complete") } catch { - case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This might leak data!".format(threadName)) + case e: InterruptedException => fatal("Shutdown of the consumer thread interrupted. This might leak data!") } } } class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord], val producer: BaseProducer, - val threadId: Int) extends Thread { - val threadName = "mirrormaker-producer-" + threadId - val logger = org.apache.log4j.Logger.getLogger(classOf[KafkaMigrationTool.ProducerThread].getName) - val shutdownComplete: CountDownLatch = new CountDownLatch(1) - - private final val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) + val threadId: Int) extends Thread with Logging { + private val threadName = "mirrormaker-producer-" + threadId + private val shutdownComplete: CountDownLatch = new CountDownLatch(1) + this.logIdent = "[%s] ".format(threadName) setName(threadName) override def run { + info("Starting mirror maker producer thread " + threadName) try { while (true) { val data: ProducerRecord = dataChannel.take - logger.trace("Sending message with value size %d".format(data.value().size)) + trace("Sending message with value size %d".format(data.value().size)) if(data eq shutdownMessage) { - logger.info("Producer thread " + threadName + " finished running") + info("Received shutdown message") return } producer.send(data.topic(), data.key(), data.value()) } } catch { case t: Throwable => { - logger.fatal("Producer thread failure due to ", t) + fatal("Producer thread failure due to ", t) } } finally { shutdownComplete.countDown + info("Producer thread stopped") } } def shutdown { try { - logger.info("Producer thread " + threadName + " shutting down") + info("Producer thread " + threadName + " shutting down") dataChannel.put(shutdownMessage) } catch { case ie: InterruptedException => { - logger.warn("Interrupt during shutdown of ProducerThread", ie) + warn("Interrupt during shutdown of ProducerThread") } } } @@ -269,10 +274,10 @@ object MirrorMaker extends Logging { try { shutdownComplete.await producer.close - logger.info("Producer thread " + threadName + " shutdown complete") + info("Producer thread shutdown complete") } catch { case ie: InterruptedException => { - logger.warn("Interrupt during shutdown of ProducerThread") + warn("Shutdown of the consumer thread interrupted") } } }