From 2c4377a2b7ab95872b477be7923fc84ead4cef29 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 17 Mar 2015 13:46:22 -0700 Subject: [PATCH] move shutdown hook registration before creating producer and consumer streams, so mirror maker can be shutdown correctly if error occur during consumer stream creation. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 87b925c..11acc31 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -170,6 +170,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() + Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") { + override def run() { + cleanShutdown() + } + }) + // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) // Defaults to no data loss settings. @@ -255,12 +261,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") { - override def run() { - cleanShutdown() - } - }) - mirrorMakerThreads.foreach(_.start()) mirrorMakerThreads.foreach(_.awaitShutdown()) } -- 1.8.3.4 (Apple Git-47)