diff --git core/src/main/scala/kafka/consumer/ConsoleConsumer.scala core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index bddbb2b..388623f 100644
--- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -27,6 +27,60 @@ import java.io.PrintStream
 import kafka.message._
 import kafka.utils.{Utils, Logging}
 import kafka.utils.ZKStringSerializer
+import kafka.consumer.ConsoleConsumer.MessageFormatter
+
+class ConsoleConsumer(private val consumerConfig: ConsumerConfig,
+                      private val formatter: MessageFormatter,
+                      private val maxMessages: Int,
+                      private val skipMessageOnError: Boolean)
+        extends ConsumerAgent(consumerConfig) with Logging {
+
+  private val lock = new Object()
+  private var numMessages = 0
+
+  override def processMessage(topic: String, message: Message) {
+    lock synchronized {
+      if (maxMessages < 0 || (maxMessages > 0 && numMessages < maxMessages)) {
+        try {
+          formatter.writeTo(message, System.out)
+        } catch {
+          case e =>
+            if (skipMessageOnError)
+              error("Error processing message (topic %s), skipping: "
+                            .format(topic), e)
+            else
+              throw e
+        }
+
+        if (maxMessages > 0) {
+          numMessages += 1
+          if (numMessages >= maxMessages) {
+            info("Consumed %d messages.".format(numMessages))
+            shutdown()
+          }
+        }
+      }
+
+      if(System.out.checkError()) {
+        // Shutdown since no one is listening to our output stream any more
+        System.err.println(
+          "Unable to write to standard out, closing console consumer.")
+        shutdown()
+      }
+
+    }
+  }
+
+  override def afterStoppingWorkerThread() {
+    System.out.flush()
+    formatter.close()
+  }
+
+  override def afterShutdown() {
+    info("Shut down console consumer.")
+  }
+
+}
 
 /**
  * Consumer that dumps messages out to standard out.
@@ -36,11 +90,13 @@ object ConsoleConsumer extends Logging {
 
   def main(args: Array[String]) {
     val parser = new OptionParser
-    val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.")
+    val topicIdsOpt = parser.accepts("topic",
+                           "REQUIRED: The list of topic ids to consume on. " +
+                           "(This option also accepts standard Java regex patterns).")
                            .withRequiredArg
                            .describedAs("topic")
                            .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + 
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
                                       "Multiple URLS can be given to allow fail-over.")
                            .withRequiredArg
                            .describedAs("urls")
@@ -48,13 +104,13 @@ object ConsoleConsumer extends Logging {
     val groupIdOpt = parser.accepts("group", "The group id to consume on.")
                            .withRequiredArg
                            .describedAs("gid")
-                           .defaultsTo("console-consumer-" + new Random().nextInt(100000))   
+                           .defaultsTo("console-consumer-" + new Random().nextInt(100000))
                            .ofType(classOf[String])
     val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
                            .withRequiredArg
                            .describedAs("size")
                            .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1024 * 1024)   
+                           .defaultsTo(1024 * 1024)
     val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
                            .withRequiredArg
                            .describedAs("size")
@@ -90,7 +146,7 @@ object ConsoleConsumer extends Logging {
         "skip it instead of halt.")
 
     val options: OptionSet = tryParse(parser, args)
-    checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
+    Utils.checkRequiredArgs(parser, options, zkConnectOpt, topicIdsOpt)
     
     val props = new Properties()
     props.put("groupid", options.valueOf(groupIdOpt))
@@ -101,65 +157,35 @@ object ConsoleConsumer extends Logging {
     props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
     props.put("zk.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
+    props.put("mirror.topics.whitelist", options.valueOf(topicIdsOpt))
+    // (else all topics are whitelisted)
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     
-    val topic = options.valueOf(topicIdOpt)
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
     
     val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
+    require(maxMessages == -1 || maxMessages > 0)
 
-    val connector = Consumer.create(config)
+    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+    formatter.init(formatterArgs)
+
+    val consoleConsumerInstance = new ConsoleConsumer(config, formatter,
+                                              maxMessages, skipMessageOnError)
 
     if(options.has(resetBeginningOpt))
       tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
-        connector.shutdown()
-        // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
-        if(!options.has(groupIdOpt))  
-          tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
+        tryCleanupZookeeper(options.valueOf(zkConnectOpt),
+                            options.valueOf(groupIdOpt))
       }
     })
-    
-    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
-    val iter =
-      if(maxMessages >= 0)
-        stream.slice(0, maxMessages)
-      else
-        stream
-
-    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
-    formatter.init(formatterArgs)
 
-    try {
-      for(message <- iter) {
-        try {
-          formatter.writeTo(message, System.out)
-        } catch {
-          case e =>
-            if (skipMessageOnError)
-              error("Error processing message, skipping this message: ", e)
-            else
-              throw e
-        }
-        if(System.out.checkError()) { 
-          // This means no one is listening to our output stream any more, time to shutdown
-          System.err.println("Unable to write to standard out, closing consumer.")
-          formatter.close()
-          connector.shutdown()
-          System.exit(1)
-        }
-      }
-    } catch {
-      case e => error("Error processing message, stopping consumer: ", e)
-    }
-      
-    System.out.flush()
-    formatter.close()
-    connector.shutdown()
+    consoleConsumerInstance.startup()
+    consoleConsumerInstance.waitForShutdown()
   }
 
   def tryParse(parser: OptionParser, args: Array[String]) = {
@@ -173,16 +199,6 @@ object ConsoleConsumer extends Logging {
     }
   }
   
-  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
-    for(arg <- required) {
-      if(!options.has(arg)) {
-        error("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-  }
-  
   def tryParseFormatterArgs(args: Iterable[String]): Properties = {
     val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
     if(!splits.forall(_.length == 2)) {
@@ -237,5 +253,5 @@ object ConsoleConsumer extends Logging {
       case _ => // swallow
     }
   }
-   
 }
+
diff --git core/src/main/scala/kafka/consumer/ConsumerAgent.scala core/src/main/scala/kafka/consumer/ConsumerAgent.scala
new file mode 100644
index 0000000..5b91f6c
--- /dev/null
+++ core/src/main/scala/kafka/consumer/ConsumerAgent.scala
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.utils.{Utils, Logging}
+import kafka.message.Message
+import kafka.common.ConsumerRebalanceFailedException
+import org.apache.log4j.Logger
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicBoolean
+
+abstract class ConsumerAgent(private val consumerConfig: ConsumerConfig)
+        extends TopicEventHandler[String] {
+
+  private val _logger = Logger.getLogger(this.getClass.getName)
+
+  val ident = "[%s @ %s]"
+          .format(consumerConfig.groupId, consumerConfig.zkConnect)
+
+  private val shutdownRequested = new AtomicBoolean(false)
+  private val handlingTopicEvent = new AtomicBoolean(false)
+
+  private val shutdownLatch = new CountDownLatch(1)
+  private var workersStoppedLatch = new CountDownLatch(0)
+
+  def processMessage(t: String, m: Message)
+
+  def afterStoppingWorkerThread()
+
+  def afterShutdown()
+
+  _logger.info("%s Whitelist = %s".format(
+    ident, consumerConfig.mirrorTopicsWhitelist))
+  _logger.info("%s Blacklist = %s".format(
+    ident, consumerConfig.mirrorTopicsBlackList))
+
+  private class WorkerThread (
+                      private val messageStream: KafkaMessageStream[Message],
+                      private val topic: String,
+                      private val threadId: Int) extends Thread with Logging {
+
+    private val name = "kafka-consumeragent-%s-%d".format(topic, threadId)
+
+    this.setDaemon(false)
+    this.setName(name)
+
+    override def run() {
+      info("%s Started worker thread %s".format(ident, name))
+
+      try {
+        val it = messageStream.iterator()
+        Stream.continually(
+          if (!handlingTopicEvent.get() && it.hasNext()) it.next() else null)
+                .takeWhile(item => item != null).foreach {
+          message =>
+            trace("%s Worker thread %s received message %d"
+                          .format(ident, name, message.checksum))
+            processMessage(topic, message)
+          }
+      }
+      catch {
+        case e: ConsumerTimeoutException =>
+          info("%s Worker thread %s consumer timed out.".format(ident, name))
+          shutdown()
+        case e =>
+          fatal("%s Worker thread %s %s stream %d unexpectedly exited"
+                        .format(ident, name, topic, threadId), e)
+          shutdown()
+      } finally {
+        /*
+         * Should not call shutdown here (which would lead to system exit) - we
+         * may be here due to a topicEvent in which case new worker threads
+         * will need to be created.
+         */
+        afterStoppingWorkerThread()
+        workersStoppedLatch.countDown()
+        info("%s Stopped worker thread %s".format(ident, name))
+      }
+    }
+
+  }
+
+  private var workerThreads = List[WorkerThread]()
+
+  // should be accessed by handleTopicEvent only
+  private var topics: Seq[String] = List()
+
+  private var consumerConnector: ConsumerConnector = null
+  private var topicEventWatcher: ZookeeperTopicEventWatcher = null
+
+  private def isTopicAllowed(topic: String) = {
+    val allowed = if (consumerConfig.mirrorTopicsBlackList.nonEmpty)
+      !topic.matches(consumerConfig.mirrorTopicsBlackList)
+    else
+      topic.matches(consumerConfig.mirrorTopicsWhitelist)
+    _logger.debug("%s %s %s".format(
+      ident, topic, if (allowed) "allowed" else "filtered"))
+
+    allowed
+  }
+
+  private def startNewWorkerThreads(topicsToConsume: Seq[String]) {
+    // wait for previous worker threads to finish up
+    workersStoppedLatch.await()
+    val topicMap = makeTopicMap(topicsToConsume)
+    if (topicMap.nonEmpty) {
+      try {
+        consumerConnector = Consumer.create(consumerConfig)
+        val topicMessageStreams =
+          consumerConnector.createMessageStreams(topicMap)
+
+        workersStoppedLatch = new CountDownLatch(topicMessageStreams.size)
+
+        for ((topic, streamList) <- topicMessageStreams)
+          for (i <- 0 until streamList.length)
+            workerThreads ::= new WorkerThread(streamList(i), topic, i)
+        workerThreads.foreach(_.start())
+      }
+      catch {
+        case e: ConsumerRebalanceFailedException =>
+          _logger.fatal(ident + " Cannot rebalance; proceeding to shutdown", e)
+          shutdown()
+      }
+    }
+    else {
+      _logger.info(ident + " No topics to consume.")
+      /*
+       * Since we want to consume from topics that match the topic event
+       * watcher's whitelist, we should just wait indefinitely. If we
+       * decide to exit instead if there are no topics to consume then we can
+       * just countdown the shutdownLatch here.
+       */
+    }
+  }
+
+  @Override
+  def handleTopicEvent(allTopics: Seq[String]) {
+    if (!handlingTopicEvent.getAndSet(true))
+    {
+      val updatedTopics = allTopics.filter(isTopicAllowed)
+
+      val addedTopics = updatedTopics filterNot (topics contains)
+      if (addedTopics.nonEmpty)
+        _logger.info("%s Topic event: added topics = %s"
+                             .format(ident, addedTopics))
+
+      val deletedTopics = topics filterNot (updatedTopics contains)
+      if (deletedTopics.nonEmpty)
+        _logger.info("%s Topic event: deleted topics = %s"
+                             .format(ident, deletedTopics))
+
+      topics = updatedTopics
+      _logger.info("%s Topics to consume = %s".format(ident, topics))
+
+      if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
+        if (consumerConnector != null)
+          consumerConnector.shutdown()
+
+        workerThreads = Nil
+
+        workersStoppedLatch.await()
+
+        handlingTopicEvent.set(false)
+
+        startNewWorkerThreads(topics)
+      }
+      else
+        handlingTopicEvent.set(false)
+    }
+  }
+
+  private def stopWatcherAndConnector() {
+    if (topicEventWatcher != null) {
+      topicEventWatcher.shutdown()
+    }
+
+    if (consumerConnector != null) {
+      consumerConnector.shutdown()
+    }
+  }
+
+  def shutdown() {
+    if (!shutdownRequested.getAndSet(true)) {
+      stopWatcherAndConnector()
+      shutdownLatch.countDown()
+    }
+  }
+
+  def startup() {
+    if (isTrivialRegex(consumerConfig.mirrorTopicsWhitelist)) {
+      _logger.info(
+        "%s Trivial whitelist list (%s) - topic event watcher is not required."
+                .format(ident, consumerConfig.mirrorTopicsWhitelist))
+      startNewWorkerThreads(
+        consumerConfig.mirrorTopicsWhitelist.split(",").toList)
+    }
+    else {
+      _logger.info("%s Non-trivial whitelist (%s) - creating topic event watcher."
+                           .format(ident, consumerConfig.mirrorTopicsWhitelist))
+      topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this)
+    }
+
+    Runtime.getRuntime.addShutdownHook(new Thread() {
+      override def run() { stopWatcherAndConnector() }
+    })
+  }
+
+  def waitForShutdown() {
+    shutdownLatch.await()
+
+    workersStoppedLatch.await()
+
+    afterShutdown()
+  }
+
+  private def makeTopicMap(mirrorTopics: Seq[String]) = {
+    if (mirrorTopics.nonEmpty)
+      Utils.getConsumerTopicMap(mirrorTopics.mkString(
+        "", ":%d,".format(consumerConfig.mirrorConsumerNumThreads),
+        ":%d".format(consumerConfig.mirrorConsumerNumThreads)))
+    else
+      Utils.getConsumerTopicMap("")
+  }
+
+  private def isTrivialRegex(regex: String) = regex.matches("[\\p{Alnum}-|]+")
+}
+
diff --git core/src/main/scala/kafka/consumer/ConsumerConfig.scala core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index c2ac746..f88768c 100644
--- core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -21,6 +21,8 @@ import java.util.Properties
 import kafka.utils.{ZKConfig, Utils}
 import kafka.api.OffsetRequest
 import kafka.common.InvalidConfigException
+import java.util.regex.PatternSyntaxException
+
 object ConsumerConfig {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
@@ -90,18 +92,44 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
 
-  /** Whitelist of topics for this mirror's embedded consumer to consume. At
-   *  most one of whitelist/blacklist may be specified. */
-  val mirrorTopicsWhitelist = Utils.getString(
-    props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)
- 
-  /** Topics to skip mirroring. At most one of whitelist/blacklist may be
-   *  specified */
-  val mirrorTopicsBlackList = Utils.getString(
-    props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
-
-  if (mirrorTopicsWhitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty)
-      throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
+
+  /**
+   * Whitelist/blacklist of topics to allow/disallow for mirroring. At most one
+   * of whitelist/blacklist may be specified. For convenience, comma-separated
+   * whitelist/blacklist is allowed. We replace commas with the "logical or"
+   * pattern selector.
+   */
+  val mirrorTopicsBlackList = {
+    val given = Utils.getString(
+      props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
+    try {
+      Utils.checkAndSanitizeTopicRegex(given)
+    }
+    catch {
+      case e: PatternSyntaxException =>
+        throw new InvalidConfigException("Blacklist %s is an invalid regex."
+                                                 .format(given))
+    }
+  }
+
+  val mirrorTopicsWhitelist = {
+    val given = Utils.getString(
+        props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)
+    try {
+      val whitelist = Utils.checkAndSanitizeTopicRegex(given)
+
+      if (whitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty)
+        throw new InvalidConfigException("The embedded consumer's mirror " +
+          "topics configuration can only contain one of blacklist or whitelist")
+
+      if (whitelist.isEmpty) ".+" else whitelist
+    }
+    catch {
+      case e: PatternSyntaxException =>
+        throw new InvalidConfigException("Whitelist %s is an invalid regex."
+                                                 .format(given))
+    }
+  }
 
   val mirrorConsumerNumThreads = Utils.getInt(
     props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
diff --git core/src/main/scala/kafka/consumer/ConsumerIterator.scala core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index cb95919..8d0677f 100644
--- core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -43,7 +43,7 @@ class ConsumerIterator[T](private val topic: String,
     if(consumedOffset < 0)
       throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
     currentTopicInfo.resetConsumeOffset(consumedOffset)
-    trace("Setting consumed offset to %d".format(consumedOffset))
+    trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
     ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
     decodedMessage
   }
diff --git core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index eb563e1..1552f18 100644
--- core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -21,11 +21,9 @@ import scala.collection.JavaConversions._
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.server.KafkaServerStartable
-import kafka.common.ConsumerRebalanceFailedException
 
 class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
-    val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging {
+    val eventHandler: TopicEventHandler[String]) extends Logging {
 
   val lock = new Object()
 
@@ -35,7 +33,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
   startWatchingTopicEvents()
 
   private def startWatchingTopicEvents() {
-    val topicEventListener = new ZkTopicEventListener(kafkaServerStartable)
+    val topicEventListener = new ZkTopicEventListener()
     ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
 
     zkClient.subscribeStateChanges(
@@ -62,7 +60,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
     }
   }
 
-  class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener {
+  class ZkTopicEventListener extends IZkChildListener {
 
     @throws(classOf[Exception])
     def handleChildChange(parent: String, children: java.util.List[String]) {
@@ -76,11 +74,8 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
           }
         }
         catch {
-          case e: ConsumerRebalanceFailedException =>
-            fatal("can't rebalance in embedded consumer; proceed to shutdown", e)
-            kafkaServerStartable.shutdown()
           case e =>
-            error("error in handling child changes in embedded consumer", e)
+            error("error in handling child changes", e)
         }
       }
     }
diff --git core/src/main/scala/kafka/server/KafkaServerStartable.scala core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 3e196e8..4ec30fa 100644
--- core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -24,6 +24,7 @@ import kafka.message.Message
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.Map
+import kafka.common.ConsumerRebalanceFailedException
 
 class KafkaServerStartable(val serverConfig: KafkaConfig,
                            val consumerConfig: ConsumerConfig,
@@ -37,6 +38,8 @@ class KafkaServerStartable(val serverConfig: KafkaConfig,
 
   private def init() {
     server = new KafkaServer(serverConfig)
+    // embedded consumer can be deprecated eventually - we should switch to
+    // using MirrorMaker tool
     if (consumerConfig != null)
       embeddedConsumer =
         new EmbeddedConsumer(consumerConfig, producerConfig, this)
@@ -117,7 +120,14 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
 
     if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
       info("mirror topics = %s".format(mirrorTopics))
-      startNewConsumerThreads(makeTopicMap(mirrorTopics))
+      try {
+        startNewConsumerThreads(makeTopicMap(mirrorTopics))
+      }
+      catch {
+        case e: ConsumerRebalanceFailedException =>
+          fatal("can't rebalance; proceed to shutdown", e)
+          kafkaServerStartable.shutdown()
+      }
     }
   }
 
@@ -161,7 +171,7 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
 
   def startup() {
     info("staring up embedded consumer")
-    topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this, kafkaServerStartable)
+    topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this)
     /*
     * consumer threads are (re-)started upon topic events (which includes an
     * initial startup event which lists the current topics)
diff --git core/src/main/scala/kafka/tools/MirrorMaker.scala core/src/main/scala/kafka/tools/MirrorMaker.scala
new file mode 100644
index 0000000..981361a
--- /dev/null
+++ core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.consumer.{ConsumerAgent, ConsumerConfig}
+import kafka.message.Message
+import joptsimple.OptionParser
+import kafka.utils.{Utils, Logging}
+import scala.collection.JavaConversions._
+import kafka.producer.{ProducerConfig, ProducerData, Producer}
+
+class MirrorMaker(private val consumerConfig: ConsumerConfig,
+                  private val producer: Producer[Null, Message])
+        extends ConsumerAgent(consumerConfig) with Logging {
+
+  override def processMessage(topic: String, message: Message) {
+    val pd = new ProducerData[Null, Message](topic, message)
+    producer.send(pd)
+  }
+
+  override def afterStoppingWorkerThread() {
+  }
+
+  override def afterShutdown() {
+    producer.close()
+    info("Shut down MirrorMaker.")
+  }
+}
+
+object MirrorMaker extends Logging {
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+
+    val consumerConfigOpt = parser.accepts("consumer-config",
+      "Consumer config to consume from a source cluster. " +
+      "You may specify multiple of these.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(classOf[String])
+
+    val producerConfigOpt = parser.accepts("producer-config",
+      "Embedded producer config.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(classOf[String])
+
+    val helpOpt = parser.accepts("help", "Print this message.")
+
+    val options = parser.parse(args : _*)
+
+    if (options.has(helpOpt)) {
+      parser.printHelpOn(System.out)
+      System.exit(0)
+    }
+
+    Utils.checkRequiredArgs(
+      parser, options, consumerConfigOpt, producerConfigOpt)
+
+    val producer = {
+      val config = new ProducerConfig(
+        Utils.loadProps(options.valueOf(producerConfigOpt)))
+      new Producer[Null, Message](config)
+    }
+
+    val mirrorMakers: List[MirrorMaker] = options.valuesOf(consumerConfigOpt)
+            .toList.map(cfg => {
+      val consumerConfig = new ConsumerConfig(Utils.loadProps(cfg.toString))
+      new MirrorMaker(consumerConfig, producer)
+    })
+
+    mirrorMakers.foreach(_.startup())
+    mirrorMakers.foreach(_.waitForShutdown())
+  }
+
+}
+
diff --git core/src/main/scala/kafka/utils/Utils.scala core/src/main/scala/kafka/utils/Utils.scala
index 96d04b6..d8e2135 100644
--- core/src/main/scala/kafka/utils/Utils.scala
+++ core/src/main/scala/kafka/utils/Utils.scala
@@ -29,12 +29,14 @@ import scala.collection._
 import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
+import joptsimple.{OptionSpec, OptionSet, OptionParser}
+import java.util.regex.Pattern
+
 
 /**
  * Helper functions!
  */
 object Utils extends Logging {
-  
   /**
    * Wrap the given function in a java.lang.Runnable
    * @param fun A function
@@ -615,6 +617,28 @@ object Utils extends Logging {
       case _ => // swallow
     }
   }
+
+  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+    for(arg <- required) {
+      if(!options.has(arg)) {
+        error("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+  }
+
+  def checkAndSanitizeTopicRegex(pattern: String) = {
+    val sanitized = pattern
+            .trim
+            .replace(',', '|')
+            .replace(" ", "")
+            .replaceAll("^[\"\']+","")
+            .replaceAll("[\"\']+$","") // property files may bring quotes
+    Pattern.compile(sanitized)
+    sanitized
+  }
+
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
diff --git system_test/embedded_consumer/bin/run-test.sh system_test/embedded_consumer/bin/run-test.sh
index e11fe27..36a3daa 100755
--- system_test/embedded_consumer/bin/run-test.sh
+++ system_test/embedded_consumer/bin/run-test.sh
@@ -14,8 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-readonly num_messages=400000
-readonly message_size=400
+readonly num_messages=4000
+readonly message_size=100
 readonly action_on_fail="proceed"
 
 readonly test_start_time="$(date +%s)"
@@ -125,7 +125,7 @@ shutdown_servers() {
 start_producer() {
     topic=$1
     info "start producing messages for topic $topic ..."
-    $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
+    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
     pid_producer=$!
 }
 
diff --git system_test/mirror_maker/README system_test/mirror_maker/README
new file mode 100644
index 0000000..da53c14
--- /dev/null
+++ system_test/mirror_maker/README
@@ -0,0 +1,22 @@
+This test replicates messages from two source kafka clusters into one target
+kafka cluster using the mirror-maker tool.  At the end, the messages produced
+at the source brokers should match that at the target brokers.
+
+To run this test, do
+bin/run-test.sh
+
+In the event of failure, by default the brokers and zookeepers remain running
+to make it easier to debug the issue - hit Ctrl-C to shut them down. You can
+change this behavior by setting the action_on_fail flag in the script to "exit"
+or "proceed", in which case a snapshot of all the logs and directories is
+placed in the test's base directory.
+
+It is a good idea to run the test in a loop. E.g.:
+
+:>/tmp/mirrormaker_test.log
+for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/mirrormaker_test.log; done
+tail -F /tmp/mirrormaker_test.log
+
+grep -ic passed /tmp/mirrormaker_test.log
+grep -ic failed /tmp/mirrormaker_test.log
+
diff --git system_test/mirror_maker/bin/expected.out system_test/mirror_maker/bin/expected.out
new file mode 100644
index 0000000..0a1bbaf
--- /dev/null
+++ system_test/mirror_maker/bin/expected.out
@@ -0,0 +1,18 @@
+start the servers ...
+start producing messages ...
+wait for consumer to finish consuming ...
+[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool)
+thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec
+[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool)
+[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+Total Num Messages: 400000 bytes: 79859641 in 22.93 secs
+Messages/sec: 17444.3960
+MB/sec: 3.3214
+test passed
+stopping the servers
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
diff --git system_test/mirror_maker/bin/run-test.sh system_test/mirror_maker/bin/run-test.sh
new file mode 100755
index 0000000..2297897
--- /dev/null
+++ system_test/mirror_maker/bin/run-test.sh
@@ -0,0 +1,355 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+readonly num_messages=10000
+readonly message_size=100
+readonly action_on_fail="proceed"
+# readonly action_on_fail="wait"
+
+readonly test_start_time="$(date +%s)"
+
+readonly base_dir=$(dirname $0)/..
+
+info() {
+    echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+kill_child_processes() {
+    isTopmost=$1
+    curPid=$2
+    childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
+    for childPid in $childPids
+    do
+        kill_child_processes 0 $childPid
+    done
+    if [ $isTopmost -eq 0 ]; then
+        kill -15 $curPid 2> /dev/null
+    fi
+}
+
+cleanup() {
+    info "cleaning up"
+
+    pid_zk_source1=
+    pid_zk_source2=
+    pid_zk_target=
+    pid_kafka_source_1_1=
+    pid_kafka_source_1_2=
+    pid_kafka_source_2_1=
+    pid_kafka_source_2_2=
+    pid_kafka_target_1_1=
+    pid_kafka_target_1_2=
+    pid_producer=
+    pid_mirrormaker_1=
+    pid_mirrormaker_2=
+
+    rm -rf /tmp/zookeeper*
+
+    rm -rf /tmp/kafka*
+}
+
+begin_timer() {
+    t_begin=$(date +%s)
+}
+
+end_timer() {
+    t_end=$(date +%s)
+}
+
+start_zk() {
+    info "starting zookeepers"
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_1.properties 2>&1 > $base_dir/zookeeper_source-1.log &
+    pid_zk_source1=$!
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_2.properties 2>&1 > $base_dir/zookeeper_source-2.log &
+    pid_zk_source2=$!
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
+    pid_zk_target=$!
+}
+
+start_source_servers() {
+    info "starting source cluster"
+
+    JMX_PORT=1111 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_1.properties 2>&1 > $base_dir/kafka_source-1-1.log &
+    pid_kafka_source_1_1=$!
+    JMX_PORT=2222 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_2.properties 2>&1 > $base_dir/kafka_source-1-2.log &
+    pid_kafka_source_1_2=$!
+    JMX_PORT=3333 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_1.properties 2>&1 > $base_dir/kafka_source-2-1.log &
+    pid_kafka_source_2_1=$!
+    JMX_PORT=4444 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_2.properties 2>&1 > $base_dir/kafka_source-2-2.log &
+    pid_kafka_source_2_2=$!
+}
+
+start_target_servers() {
+    echo "starting mirror cluster"
+    JMX_PORT=5555 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_1.properties 2>&1 > $base_dir/kafka_target-1-1.log &
+    pid_kafka_target_1_1=$!
+    JMX_PORT=6666 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_2.properties 2>&1 > $base_dir/kafka_target-1-2.log &
+    pid_kafka_target_1_2=$!
+}
+
+shutdown_servers() {
+    info "stopping mirror-maker"
+    if [ "x${pid_mirrormaker_1}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_1}; fi
+    if [ "x${pid_mirrormaker_2}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_2}; fi
+
+    info "stopping producer"
+    if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
+
+    info "shutting down target servers"
+    if [ "x${pid_kafka_target_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_1}; fi
+    if [ "x${pid_kafka_target_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_2}; fi
+    sleep 2
+
+    info "shutting down source servers"
+    if [ "x${pid_kafka_source_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_1}; fi
+    if [ "x${pid_kafka_source_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_2}; fi
+    if [ "x${pid_kafka_source_2_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_1}; fi
+    if [ "x${pid_kafka_source_2_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_2}; fi
+
+    info "shutting down zookeeper servers"
+    if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
+    if [ "x${pid_zk_source1}" != "x" ]; then kill_child_processes 0 ${pid_zk_source1}; fi
+    if [ "x${pid_zk_source2}" != "x" ]; then kill_child_processes 0 ${pid_zk_source2}; fi
+}
+
+start_producer() {
+    topic=$1
+    zk=$2
+    info "start producing messages for topic $topic to zookeeper $zk ..."
+    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
+    pid_producer=$!
+}
+
+# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
+wait_partition_done() {
+    n_tuples=$(($# / 3))
+
+    i=1
+    while (($#)); do
+        kafka_server[i]=$1
+        topic[i]=$2
+        partitionid[i]=$3
+        prev_offset[i]=0
+        info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
+        i=$((i+1))
+        shift 3
+    done
+
+    all_done=0
+
+    # set -x
+    while [[ $all_done != 1 ]]; do
+        sleep 4
+        i=$n_tuples
+        all_done=1
+        for ((i=1; i <= $n_tuples; i++)); do
+            cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
+            if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
+                all_done=0
+                prev_offset[i]=$cur_size
+            fi
+        done
+    done
+
+}
+
+cmp_logs() {
+    topic=$1
+    info "comparing source and target logs for topic $topic"
+    source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part3_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9095 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    if [ "x$source_part0_size" == "x" ]; then source_part0_size=0; fi
+    if [ "x$source_part1_size" == "x" ]; then source_part1_size=0; fi
+    if [ "x$source_part2_size" == "x" ]; then source_part2_size=0; fi
+    if [ "x$source_part3_size" == "x" ]; then source_part3_size=0; fi
+    if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
+    if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
+    expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size + $source_part3_size))
+    actual_size=$(($target_part0_size + $target_part1_size))
+    if [ "x$expected_size" != "x$actual_size" ]
+    then
+        info "source size: $expected_size target size: $actual_size"
+        return 1
+    else
+        return 0
+    fi
+}
+
+take_fail_snapshot() {
+    snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
+    mkdir $snapshot_dir
+    for dir in /tmp/zookeeper_source{1..2} /tmp/zookeeper_target /tmp/kafka-source-{1..2}-{1..2}-logs /tmp/kafka-target{1..2}-logs; do
+        if [ -d $dir ]; then
+            cp -r $dir $snapshot_dir
+        fi
+    done
+}
+
+# Usage: process_test_result <result> <action_on_fail>
+# result: last test result
+# action_on_fail: (exit|wait|proceed)
+# ("wait" is useful if you want to troubleshoot using zookeeper)
+process_test_result() {
+    result=$1
+    if [ $1 -eq 0 ]; then
+        info "test passed"
+    else
+        info "test failed"
+        case "$2" in
+            "wait") info "waiting: hit Ctrl-c to quit"
+                wait
+                ;;
+            "exit") shutdown_servers
+                take_fail_snapshot
+                exit $result
+                ;;
+            *) shutdown_servers
+                take_fail_snapshot
+                info "proceeding"
+                ;;
+        esac
+    fi
+}
+
+test_whitelists() {
+    info "### Testing whitelists"
+    snapshot_prefix="whitelist-test"
+
+    cleanup
+    start_zk
+    start_source_servers
+    start_target_servers
+    sleep 4
+
+    info "starting mirror makers"
+    JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_mirrormaker.log &
+    pid_mirrormaker_1=$!
+    JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_mirrormaker.log &
+    pid_mirrormaker_2=$!
+
+    begin_timer
+
+    start_producer whitetopic01 localhost:2181
+    start_producer whitetopic01 localhost:2182
+    info "waiting for whitetopic01 producers to finish producing ..."
+    wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0 kafka://localhost:9092 whitetopic01 0 kafka://localhost:9093 whitetopic01 0
+
+    start_producer whitetopic02 localhost:2181
+    start_producer whitetopic03 localhost:2181
+    start_producer whitetopic04 localhost:2182
+    info "waiting for whitetopic02,whitetopic03,whitetopic04 producers to finish producing ..."
+    wait_partition_done kafka://localhost:9090 whitetopic02 0 kafka://localhost:9091 whitetopic02 0 kafka://localhost:9090 whitetopic03 0 kafka://localhost:9091 whitetopic03 0 kafka://localhost:9092 whitetopic04 0 kafka://localhost:9093 whitetopic04 0
+
+    start_producer blacktopic01 localhost:2182
+    info "waiting for blacktopic01 producer to finish producing ..."
+    wait_partition_done kafka://localhost:9092 blacktopic01 0 kafka://localhost:9093 blacktopic01 0
+
+    info "waiting for consumer to finish consuming ..."
+
+    wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0 kafka://localhost:9094 whitetopic02 0 kafka://localhost:9095 whitetopic02 0 kafka://localhost:9094 whitetopic03 0 kafka://localhost:9095 whitetopic03 0 kafka://localhost:9094 whitetopic04 0 kafka://localhost:9095 whitetopic04 0
+
+    end_timer
+    info "embedded consumer took $((t_end - t_begin)) seconds"
+
+    sleep 2
+
+    # if [[ -d /tmp/kafka-target-1-1-logs/blacktopic01 || /tmp/kafka-target-1-2-logs/blacktopic01 ]]; then
+    #     echo "blacktopic01 found on target cluster"
+    #     result=1
+    # else
+    #     cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
+    #     result=$?
+    # fi
+
+    cmp_logs blacktopic01
+
+    cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
+    result=$?
+
+    return $result
+}
+
+test_blacklists() {
+    info "### Testing blacklists"
+    snapshot_prefix="blacklist-test"
+    cleanup
+    start_zk
+    start_source_servers
+    start_target_servers
+    sleep 4
+
+    info "starting mirror maker"
+    $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/blacklisttest.consumer.properties --producer-config $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_mirrormaker.log &
+    pid_mirrormaker_1=$!
+
+    start_producer blacktopic01 localhost:2181
+    start_producer blacktopic02 localhost:2181
+    info "waiting for producer to finish producing blacktopic01,blacktopic02 ..."
+    wait_partition_done kafka://localhost:9090 blacktopic01 0 kafka://localhost:9091 blacktopic01 0 kafka://localhost:9090 blacktopic02 0 kafka://localhost:9091 blacktopic02 0
+
+    begin_timer
+
+    start_producer whitetopic01 localhost:2181
+    info "waiting for producer to finish producing whitetopic01 ..."
+    wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0
+
+    info "waiting for consumer to finish consuming ..."
+    wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0
+
+    end_timer
+
+    info "embedded consumer took $((t_end - t_begin)) seconds"
+
+    sleep 2
+
+    cmp_logs blacktopic01 || cmp_logs blacktopic02
+    if [ $? -eq 0 ]; then
+        return 1
+    fi
+    
+    cmp_logs whitetopic01
+    return $?
+}
+
+# main test begins
+
+echo "Test-$test_start_time"
+
+# Ctrl-c trap. Catches INT signal
+trap "shutdown_servers; exit 0" INT
+
+test_whitelists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
+
+sleep 2
+ 
+test_blacklists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
+
+exit $result
+
diff --git system_test/mirror_maker/config/blacklisttest.consumer.properties system_test/mirror_maker/config/blacklisttest.consumer.properties
new file mode 100644
index 0000000..162b65d
--- /dev/null
+++ system_test/mirror_maker/config/blacklisttest.consumer.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+mirror.topics.blacklist="blacktopic.*"
+
diff --git system_test/mirror_maker/config/mirror_producer.properties system_test/mirror_maker/config/mirror_producer.properties
new file mode 100644
index 0000000..5940c24
--- /dev/null
+++ system_test/mirror_maker/config/mirror_producer.properties
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+# broker.list=1:localhost:9094,2:localhost:9095
+
+# timeout in ms for connecting to zookeeper
+# zk.connectiontimeout.ms=1000000
+
+producer.type=async
+
+# to avoid dropping events if the queue is full, wait indefinitely
+queue.enqueueTimeout.ms=-1
+
diff --git system_test/mirror_maker/config/server_source_1_1.properties system_test/mirror_maker/config/server_source_1_1.properties
new file mode 100644
index 0000000..d89c4fb
--- /dev/null
+++ system_test/mirror_maker/config/server_source_1_1.properties
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9090
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-1-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=10000000
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
diff --git system_test/mirror_maker/config/server_source_1_2.properties system_test/mirror_maker/config/server_source_1_2.properties
new file mode 100644
index 0000000..063d68b
--- /dev/null
+++ system_test/mirror_maker/config/server_source_1_2.properties
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9091
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-1-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
diff --git system_test/mirror_maker/config/server_source_2_1.properties system_test/mirror_maker/config/server_source_2_1.properties
new file mode 100644
index 0000000..998b460
--- /dev/null
+++ system_test/mirror_maker/config/server_source_2_1.properties
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-2-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
diff --git system_test/mirror_maker/config/server_source_2_2.properties system_test/mirror_maker/config/server_source_2_2.properties
new file mode 100644
index 0000000..81427ae
--- /dev/null
+++ system_test/mirror_maker/config/server_source_2_2.properties
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9093
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-2-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
diff --git system_test/mirror_maker/config/server_target_1_1.properties system_test/mirror_maker/config/server_target_1_1.properties
new file mode 100644
index 0000000..0265f4e
--- /dev/null
+++ system_test/mirror_maker/config/server_target_1_1.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9094
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target-1-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
diff --git system_test/mirror_maker/config/server_target_1_2.properties system_test/mirror_maker/config/server_target_1_2.properties
new file mode 100644
index 0000000..a31e9ca
--- /dev/null
+++ system_test/mirror_maker/config/server_target_1_2.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9095
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target-1-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
diff --git system_test/mirror_maker/config/whitelisttest_1.consumer.properties system_test/mirror_maker/config/whitelisttest_1.consumer.properties
new file mode 100644
index 0000000..8fb79fe
--- /dev/null
+++ system_test/mirror_maker/config/whitelisttest_1.consumer.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+mirror.topics.whitelist="whitetopic.*"
+
diff --git system_test/mirror_maker/config/whitelisttest_2.consumer.properties system_test/mirror_maker/config/whitelisttest_2.consumer.properties
new file mode 100644
index 0000000..2841544
--- /dev/null
+++ system_test/mirror_maker/config/whitelisttest_2.consumer.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+mirror.topics.whitelist="whitetopic.*"
+
diff --git system_test/mirror_maker/config/zookeeper_source_1.properties system_test/mirror_maker/config/zookeeper_source_1.properties
new file mode 100644
index 0000000..f851796
--- /dev/null
+++ system_test/mirror_maker/config/zookeeper_source_1.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source-1
+# the port at which the clients will connect
+clientPort=2181
diff --git system_test/mirror_maker/config/zookeeper_source_2.properties system_test/mirror_maker/config/zookeeper_source_2.properties
new file mode 100644
index 0000000..d534d18
--- /dev/null
+++ system_test/mirror_maker/config/zookeeper_source_2.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source-2
+# the port at which the clients will connect
+clientPort=2182
diff --git system_test/mirror_maker/config/zookeeper_target.properties system_test/mirror_maker/config/zookeeper_target.properties
new file mode 100644
index 0000000..55a7eb1
--- /dev/null
+++ system_test/mirror_maker/config/zookeeper_target.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_target
+# the port at which the clients will connect
+clientPort=2183
