diff --git core/src/main/scala/kafka/tools/MirrorMaker.scala core/src/main/scala/kafka/tools/MirrorMaker.scala
index 3438f2c..5727d36 100644
--- core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -96,15 +96,19 @@ object MirrorMaker extends Logging {
       new Producer[Null, Message](config)
     })
 
-    val threads = {
-      val connectors = options.valuesOf(consumerConfigOpt).toList
-              .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
-              .map(new ZookeeperConsumerConnector(_))
+    val connectors = options.valuesOf(consumerConfigOpt).toList
+            .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
+            .map(new ZookeeperConsumerConnector(_))
+
+    def cleanShutdown() {
+      connectors.foreach(_.shutdown())
+      producers.foreach(_.close())
+    }
 
+    val threads = {
       Runtime.getRuntime.addShutdownHook(new Thread() {
         override def run() {
-          connectors.foreach(_.shutdown())
-          producers.foreach(_.close())
+          cleanShutdown()
         }
       })
 
@@ -113,17 +117,26 @@ object MirrorMaker extends Logging {
       else
         new Blacklist(options.valueOf(blacklistOpt))
 
-      val streams =
-        connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
+      try {
+        val streams =
+          connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
 
-      streams.flatten.zipWithIndex.map(streamAndIndex => {
-        new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)
-      })
+        streams.flatten.zipWithIndex.map(streamAndIndex => {
+          new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)
+        })
+      }
+      catch {
+        case t: Throwable =>
+          error("Unable to create streams, shutting down: " + t.getMessage, t)
+          Nil
+      }
     }
 
     threads.foreach(_.start())
 
     threads.foreach(_.awaitShutdown())
+
+    cleanShutdown()
   }
 
   class MirrorMakerThread(stream: KafkaStream[Message],
@@ -138,6 +151,7 @@ object MirrorMaker extends Logging {
     this.setName(threadName)
 
     override def run() {
+      info("Starting mirror maker thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
           val producer = producerSelector.next()
