diff --git core/src/main/scala/kafka/consumer/ConsoleConsumer.scala core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index bddbb2b..49a6f39 100644
--- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -36,11 +36,19 @@ object ConsoleConsumer extends Logging {
 
   def main(args: Array[String]) {
     val parser = new OptionParser
-    val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.")
+    val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
                            .withRequiredArg
                            .describedAs("topic")
                            .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + 
+    val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
+                             .withRequiredArg
+                             .describedAs("whitelist")
+                             .ofType(classOf[String])
+    val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
+                             .withRequiredArg
+                             .describedAs("blacklist")
+                             .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
                                       "Multiple URLS can be given to allow fail-over.")
                            .withRequiredArg
                            .describedAs("urls")
@@ -90,8 +98,20 @@ object ConsoleConsumer extends Logging {
         "skip it instead of halt.")
 
     val options: OptionSet = tryParse(parser, args)
-    checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
+    Utils.checkRequiredArgs(parser, options, zkConnectOpt)
     
+    val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
+    if (topicOrFilterOpt.size != 1) {
+      error("Exactly one of whitelist/blacklist/topic is required.")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    val topicArg = options.valueOf(topicOrFilterOpt.head)
+    val filterSpec = if (options.has(blacklistOpt))
+      new Blacklist(topicArg)
+    else
+      new Whitelist(topicArg)
+
     val props = new Properties()
     props.put("groupid", options.valueOf(groupIdOpt))
     props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
@@ -104,7 +124,6 @@ object ConsoleConsumer extends Logging {
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     
-    val topic = options.valueOf(topicIdOpt)
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
     
@@ -123,21 +142,20 @@ object ConsoleConsumer extends Logging {
           tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
       }
     })
-    
-    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
-    val iter =
-      if(maxMessages >= 0)
-        stream.slice(0, maxMessages)
-      else
-        stream
+
+    val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
+    val iter = if(maxMessages >= 0)
+      stream.slice(0, maxMessages)
+    else
+      stream
 
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
 
     try {
-      for(message <- iter) {
+      for(messageAndTopic <- iter) {
         try {
-          formatter.writeTo(message, System.out)
+          formatter.writeTo(messageAndTopic.message, System.out)
         } catch {
           case e =>
             if (skipMessageOnError)
@@ -173,16 +191,6 @@ object ConsoleConsumer extends Logging {
     }
   }
   
-  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
-    for(arg <- required) {
-      if(!options.has(arg)) {
-        error("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-  }
-  
   def tryParseFormatterArgs(args: Iterable[String]): Properties = {
     val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
     if(!splits.forall(_.length == 2)) {
diff --git core/src/main/scala/kafka/consumer/ConsumerConnector.scala core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 14671d6..5ed0857 100644
--- core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -37,6 +37,19 @@ trait ConsumerConnector {
     : Map[String,List[KafkaMessageStream[T]]]
 
   /**
+   *  Create a list of message streams for all topics that match a given filter.
+   *
+   *  @param filterSpec TopicFilter encapsulating a Java-style regex whitelist
+   *         or blacklist.
+   *  @return a list of KafkaMessageStreams that provide iterator over messages
+   *          from whitelisted topics.
+   */
+  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
+                                      numStreams: Int = 1,
+                                      decoder: Decoder[T] = new DefaultDecoder)
+    : Seq[KafkaMessageAndTopicStream[T]]
+
+  /**
    *  Commit the offsets of all broker partitions connected by this connector.
    */
   def commitOffsets
diff --git core/src/main/scala/kafka/consumer/ConsumerIterator.scala core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index cb95919..1211194 100644
--- core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -17,77 +17,35 @@
 
 package kafka.consumer
 
-import kafka.utils.{IteratorTemplate, Logging}
-import java.util.concurrent.{TimeUnit, BlockingQueue}
-import kafka.message.MessageAndOffset
+import kafka.utils.Logging
+import java.util.concurrent.BlockingQueue
 import kafka.serializer.Decoder
-import java.util.concurrent.atomic.AtomicReference
+import kafka.message.MessageAndTopic
+
 
 /**
  * An iterator that blocks until a value can be read from the supplied queue.
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
  *
  */
-class ConsumerIterator[T](private val topic: String,
-                          private val channel: BlockingQueue[FetchedDataChunk],
+class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
                           consumerTimeoutMs: Int,
                           private val decoder: Decoder[T])
-  extends IteratorTemplate[T] with Logging {
+        extends Iterator[T] with java.util.Iterator[T] with Logging {
 
-  private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
-  private var currentTopicInfo:PartitionTopicInfo = null
-  private var consumedOffset: Long = -1L
+  val topicalConsumerIterator =
+    new TopicalConsumerIterator[T](channel, consumerTimeoutMs, decoder)
 
-  override def next(): T = {
-    val decodedMessage = super.next()
-    if(consumedOffset < 0)
-      throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
-    currentTopicInfo.resetConsumeOffset(consumedOffset)
-    trace("Setting consumed offset to %d".format(consumedOffset))
-    ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
-    decodedMessage
-  }
+  override def next(): T =  topicalConsumerIterator.next().message
 
-  protected def makeNext(): T = {
-    var currentDataChunk: FetchedDataChunk = null
-    // if we don't have an iterator, get one
-    var localCurrent = current.get()
-    if(localCurrent == null || !localCurrent.hasNext) {
-      if (consumerTimeoutMs < 0)
-        currentDataChunk = channel.take
-      else {
-        currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
-        if (currentDataChunk == null) {
-          // reset state to make the iterator re-iterable
-          resetState()
-          throw new ConsumerTimeoutException
-        }
-      }
-      if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
-        debug("Received the shutdown command")
-        channel.offer(currentDataChunk)
-        return allDone
-      } else {
-        currentTopicInfo = currentDataChunk.topicInfo
-        if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
-          error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
-                        .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
-          currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
-        }
-        localCurrent = currentDataChunk.messages.iterator
-        current.set(localCurrent)
-      }
-    }
-    val item = localCurrent.next()
-    consumedOffset = item.offset
-    decoder.toEvent(item.message)
+  def remove() {
+    throw new UnsupportedOperationException("Removal not supported")
   }
 
-  def clearCurrentChunk() = {
-    try {
-      info("Clearing the current data chunk for this consumer iterator")
-      current.set(null)
-    }
+  def hasNext = topicalConsumerIterator.hasNext()
+
+  def clearCurrentChunk() {
+    topicalConsumerIterator.clearCurrentChunk()
   }
 }
 
diff --git core/src/main/scala/kafka/consumer/Fetcher.scala core/src/main/scala/kafka/consumer/Fetcher.scala
index d18faca..9d24c2f 100644
--- core/src/main/scala/kafka/consumer/Fetcher.scala
+++ core/src/main/scala/kafka/consumer/Fetcher.scala
@@ -42,24 +42,24 @@ private [consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkCl
     fetcherThreads = EMPTY_FETCHER_THREADS
   }
 
-  def clearFetcherQueues[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+  def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
                             queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
-                            kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+                            messageStreams: Map[String,List[MessageStream]]) {
 
     // Clear all but the currently iterated upon chunk in the consumer thread's queue
     queuesTobeCleared.foreach(_.clear)
     info("Cleared all relevant queues for this fetcher")
 
     // Also clear the currently iterated upon chunk in the consumer threads
-    if(kafkaMessageStreams != null)
-       kafkaMessageStreams.foreach(_._2.foreach(s => s.clear()))
+    if(messageStreams != null)
+       messageStreams.foreach(_._2.foreach(s => s.clear()))
 
     info("Cleared the data chunks in all the consumer message iterators")
 
   }
 
-  def startConnections[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
-                            kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+  def startConnections(topicInfos: Iterable[PartitionTopicInfo],
+                       cluster: Cluster) {
     if (topicInfos == null)
       return
 
diff --git core/src/main/scala/kafka/consumer/KafkaMessageAndTopicStream.scala core/src/main/scala/kafka/consumer/KafkaMessageAndTopicStream.scala
new file mode 100644
index 0000000..f1b9e70
--- /dev/null
+++ core/src/main/scala/kafka/consumer/KafkaMessageAndTopicStream.scala
@@ -0,0 +1,30 @@
+package kafka.consumer
+
+
+import java.util.concurrent.BlockingQueue
+import kafka.serializer.Decoder
+import kafka.message.MessageAndTopic
+
+
+class KafkaMessageAndTopicStream[T](private val queue: BlockingQueue[FetchedDataChunk],
+                                    consumerTimeoutMs: Int,
+                                    private val decoder: Decoder[T])
+   extends Iterable[MessageAndTopic[T]] with java.lang.Iterable[MessageAndTopic[T]] with MessageStream {
+
+  private val iter: ConsumerIterator[T] =
+    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
+
+  /**
+   *  Create an iterator over messages in the stream.
+   */
+  def iterator(): TopicalConsumerIterator[T] = iter.topicalConsumerIterator
+
+  /**
+   * This method clears the queue being iterated during the consumer rebalancing. This is mainly
+   * to reduce the number of duplicates received by the consumer
+   */
+  def clear() {
+    iter.clearCurrentChunk()
+  }
+
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/consumer/KafkaMessageStream.scala core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
index 0771c32..b140c43 100644
--- core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
+++ core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
@@ -24,14 +24,13 @@ import kafka.serializer.Decoder
  * All calls to elements should produce the same thread-safe iterator? Should have a separate thread
  * that feeds messages into a blocking queue for processing.
  */
-class KafkaMessageStream[T](val topic: String,
-                            private val queue: BlockingQueue[FetchedDataChunk],
+class KafkaMessageStream[T](private val queue: BlockingQueue[FetchedDataChunk],
                             consumerTimeoutMs: Int,
                             private val decoder: Decoder[T])
-   extends Iterable[T] with java.lang.Iterable[T]{
+   extends Iterable[T] with java.lang.Iterable[T] with MessageStream {
 
   private val iter: ConsumerIterator[T] =
-    new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder)
+    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
     
   /**
    *  Create an iterator over messages in the stream.
diff --git core/src/main/scala/kafka/consumer/MessageStream.scala core/src/main/scala/kafka/consumer/MessageStream.scala
new file mode 100644
index 0000000..4ee8cac
--- /dev/null
+++ core/src/main/scala/kafka/consumer/MessageStream.scala
@@ -0,0 +1,6 @@
+package kafka.consumer
+
+
+trait MessageStream {
+  def clear()
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/consumer/TopicCount.scala core/src/main/scala/kafka/consumer/TopicCount.scala
index 51bf516..6806e99 100644
--- core/src/main/scala/kafka/consumer/TopicCount.scala
+++ core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -19,35 +19,18 @@ package kafka.consumer
 
 import scala.collection._
 import scala.util.parsing.json.JSON
-import kafka.utils.Logging
+import org.I0Itec.zkclient.ZkClient
+import java.util.regex.Pattern
+import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
 
-private[kafka] object TopicCount extends Logging {
-  val myConversionFunc = {input : String => input.toInt}
-  JSON.globalNumberParser = myConversionFunc
-
-  def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = {
-    var topMap : Map[String,Int] = null
-    try {
-      JSON.parseFull(jsonString) match {
-        case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
-        case None => throw new RuntimeException("error constructing TopicCount : " + jsonString)
-      }
-    }
-    catch {
-      case e =>
-        error("error parsing consumer json string " + jsonString, e)
-        throw e
-    }
-
-    new TopicCount(consumerIdSting, topMap)
-  }
-
-}
 
-private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
+private[kafka] trait TopicCount {
+  def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
 
-  def getConsumerThreadIdsPerTopic()
-    : Map[String, Set[String]] = {
+  def dbString: String
+  
+  protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
+                                            topicCountMap: Map[String,  Int]) = {
     val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
     for ((topic, nConsumers) <- topicCountMap) {
       val consumerSet = new mutable.HashSet[String]
@@ -58,11 +41,94 @@ private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap:
     }
     consumerThreadIdsPerTopicMap
   }
+}
+
+private[kafka] object TopicCount extends Logging {
+
+  /*
+   * Example of whitelist topic count stored in ZooKeeper:
+   * Topics with whitetopic as prefix, and four streams: *4*whitetopic.*
+   * Example of blacklist topic count stored in ZooKeeper:
+   * Topics with blacktopic as prefix, and four streams: !4!blacktopic.*
+   */
+
+  val WHITELIST_MARKER = "*"
+  val BLACKLIST_MARKER = "!"
+  private val WHITELIST_PATTERN =
+    Pattern.compile("\\*(\\p{Digit}+)\\*(.*)")
+  private val BLACKLIST_PATTERN =
+    Pattern.compile("!(\\p{Digit}+)!(.*)")
+
+  val myConversionFunc = {input : String => input.toInt}
+  JSON.globalNumberParser = myConversionFunc
+
+  def constructTopicCount(group: String,
+                          consumerId: String,
+                          zkClient: ZkClient) : TopicCount = {
+    val dirs = new ZKGroupDirs(group)
+    val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+    val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
+    val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
+
+    info("Constructing topic count for %s from %s using %s as pattern."
+      .format(consumerId, topicCountString,
+              if (hasWhitelist)  WHITELIST_PATTERN else BLACKLIST_PATTERN))
+
+    if (hasWhitelist || hasBlacklist) {
+      val matcher = if (hasWhitelist)
+        WHITELIST_PATTERN.matcher(topicCountString)
+      else
+        BLACKLIST_PATTERN.matcher(topicCountString)
+      require(matcher.matches())
+      val numStreams = matcher.group(1).toInt
+      val regex = matcher.group(2)
+      val filter = if (hasWhitelist)
+        new TopicFilter(new Whitelist(regex))
+      else
+        new TopicFilter(new Blacklist(regex))
+
+      new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
+    }
+    else {
+      var topMap : Map[String,Int] = null
+      try {
+        JSON.parseFull(topicCountString) match {
+          case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
+          case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
+        }
+      }
+      catch {
+        case e =>
+          error("error parsing consumer json string " + topicCountString, e)
+          throw e
+      }
+
+      new StaticTopicCount(consumerId, topMap)
+    }
+  }
+
+  def constructTopicCount(consumerIdString: String, topicCount: Map[String,  Int]) =
+    new StaticTopicCount(consumerIdString, topicCount)
+
+  def constructTopicCount(consumerIdString: String,
+                          filter: TopicFilter,
+                          numStreams: Int,
+                          zkClient: ZkClient) =
+    new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams)
+
+}
+
+private[kafka] class StaticTopicCount(val consumerIdString: String,
+                                val topicCountMap: Map[String, Int])
+                                extends TopicCount {
+
+  def getConsumerThreadIdsPerTopic =
+    makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
 
   override def equals(obj: Any): Boolean = {
     obj match {
       case null => false
-      case n: TopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
+      case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
       case _ => false
     }
   }
@@ -73,7 +139,7 @@ private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap:
    *    "topic2" : 4
    *  }
    */
-  def toJsonString() : String = {
+  def dbString = {
     val builder = new StringBuilder
     builder.append("{ ")
     var i = 0
@@ -84,6 +150,29 @@ private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap:
       i += 1
     }
     builder.append(" }")
-    builder.toString
+    builder.toString()
   }
 }
+
+private[kafka] class WildcardTopicCount(zkClient: ZkClient,
+                                        consumerIdString: String,
+                                        topicFilter: TopicFilter,
+                                        numStreams: Int) extends TopicCount {
+  def getConsumerThreadIdsPerTopic = {
+    val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(
+      zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
+    makeConsumerThreadIdsPerTopic(consumerIdString,
+                                  Map(wildcardTopics.map((_, numStreams)): _*))
+  }
+
+  def dbString = {
+    val marker = if (topicFilter.usesWhitelist)
+      TopicCount.WHITELIST_MARKER
+    else
+      TopicCount.BLACKLIST_MARKER
+
+    "%s%d%s%s".format(marker, numStreams, marker, topicFilter.filterSpec.regex)
+  }
+
+}
+
diff --git core/src/main/scala/kafka/consumer/TopicFilter.scala core/src/main/scala/kafka/consumer/TopicFilter.scala
new file mode 100644
index 0000000..fb1125e
--- /dev/null
+++ core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -0,0 +1,62 @@
+package kafka.consumer
+
+
+import kafka.utils.Logging
+import java.util.regex.{PatternSyntaxException, Pattern}
+
+
+sealed abstract class TopicFilterSpec(rawRegex: String) {
+  val regex = rawRegex
+          .trim
+          .replace(',', '|')
+          .replace(" ", "")
+          .replaceAll("^[\"\']+","")
+          .replaceAll("[\"\']+$","") // property files may bring quotes
+
+  try {
+    Pattern.compile(regex)
+  }
+  catch {
+    case e: PatternSyntaxException =>
+      throw new RuntimeException(regex + " is an invalid regex.")
+  }
+
+}
+
+case class Whitelist(rawRegex: String) extends TopicFilterSpec(rawRegex) {
+  def isTrivial = regex.matches("[\\p{Alnum}-|]+")
+}
+
+case class Blacklist(rawRegex: String) extends TopicFilterSpec(rawRegex)
+
+class TopicFilter(val filterSpec: TopicFilterSpec) extends Logging {
+
+  def usesWhitelist = filterSpec match {
+    case wl: Whitelist => true
+    case bl: Blacklist => false
+  }
+
+  def usesBlacklist = !usesWhitelist
+
+  def requiresTopicEventWatcher = {
+    filterSpec match {
+      case wl: Whitelist => wl.isTrivial
+      case _ => false
+    }
+  }
+
+  def isTopicAllowed(topic: String) = {
+    val allowed = if (usesWhitelist)
+      topic.matches(filterSpec.regex)
+    else
+      !topic.matches(filterSpec.regex)
+
+    debug("%s %s".format(
+      topic, if (allowed) "allowed" else "filtered"))
+
+    allowed
+  }
+
+  override def toString = filterSpec.regex
+}
+
diff --git core/src/main/scala/kafka/consumer/TopicalConsumerIterator.scala core/src/main/scala/kafka/consumer/TopicalConsumerIterator.scala
new file mode 100644
index 0000000..5defee4
--- /dev/null
+++ core/src/main/scala/kafka/consumer/TopicalConsumerIterator.scala
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.utils.{IteratorTemplate, Logging}
+import java.util.concurrent.{TimeUnit, BlockingQueue}
+import kafka.serializer.Decoder
+import java.util.concurrent.atomic.AtomicReference
+import kafka.message.{MessageAndTopic, MessageAndOffset}
+
+
+/**
+ * An iterator that blocks until a value can be read from the supplied queue.
+ * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
+ *
+ */
+class TopicalConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
+                          consumerTimeoutMs: Int,
+                          private val decoder: Decoder[T])
+  extends IteratorTemplate[MessageAndTopic[T]] with Logging {
+
+  private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
+  private var currentTopicInfo:PartitionTopicInfo = null
+  private var consumedOffset: Long = -1L
+
+  override def next(): MessageAndTopic[T] = {
+    val item = super.next()
+    if(consumedOffset < 0)
+      throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
+    currentTopicInfo.resetConsumeOffset(consumedOffset)
+    val topic = currentTopicInfo.topic
+    trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
+    ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
+    item
+  }
+
+  protected def makeNext(): MessageAndTopic[T] = {
+    var currentDataChunk: FetchedDataChunk = null
+    // if we don't have an iterator, get one
+    var localCurrent = current.get()
+    if(localCurrent == null || !localCurrent.hasNext) {
+      if (consumerTimeoutMs < 0)
+        currentDataChunk = channel.take
+      else {
+        currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
+        if (currentDataChunk == null) {
+          // reset state to make the iterator re-iterable
+          resetState()
+          throw new ConsumerTimeoutException
+        }
+      }
+      if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
+        debug("Received the shutdown command")
+        channel.offer(currentDataChunk)
+        return allDone
+      } else {
+        currentTopicInfo = currentDataChunk.topicInfo
+        if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
+          error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
+                        .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
+          currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
+        }
+        localCurrent = currentDataChunk.messages.iterator
+        current.set(localCurrent)
+      }
+    }
+    val item = localCurrent.next()
+    consumedOffset = item.offset
+
+    new MessageAndTopic[T](item.message, currentTopicInfo.topic, decoder)
+  }
+
+  def clearCurrentChunk() {
+    try {
+      info("Clearing the current data chunk for this consumer iterator")
+      current.set(null)
+    }
+  }
+}
+
diff --git core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 28e027b..7e68c4a 100644
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -34,6 +34,7 @@ import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException}
 import java.lang.IllegalStateException
 import kafka.utils.ZkUtils._
 
+
 /**
  * This class handles the consumers interaction with zookeeper
  *
@@ -86,8 +87,8 @@ trait ZookeeperConsumerConnectorMBean {
 
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
-  extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging {
-
+        extends ConsumerConnector with ZookeeperConsumerConnectorMBean
+                with TopicEventHandler[String] with Logging {
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[Fetcher] = None
@@ -96,6 +97,33 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   // queues : (topic,consumerThreadId) -> queue
   private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+  private val messageStreamCreated = new AtomicBoolean(false)
+
+  // fields for regex-based message streams
+  private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+  private var wildcardTopicFilter: TopicFilter = null
+  private var wildcardTopics: Seq[String] = List()
+  private var numWildcardStreams = 0
+  private var wildcardQueues: Seq[LinkedBlockingQueue[FetchedDataChunk]] = null
+  private var wildcardStreams: Seq[KafkaMessageAndTopicStream[_]] = null
+
+  private var sessionExpirationListener: ZKSessionExpireListener = null
+  private var loadBalancerListener: ZKRebalancerListener = null
+
+  val consumerIdString = {
+    var consumerUuid : String = null
+    config.consumerId match {
+      case Some(consumerId) // for testing only
+      => consumerUuid = consumerId
+      case None // generate unique consumerId automatically
+      => val uuid = UUID.randomUUID()
+      consumerUuid = "%s-%d-%s".format(
+        InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
+        uuid.getMostSignificantBits().toHexString.substring(0,8))
+    }
+    config.groupId + "_" + consumerUuid
+  }
+  this.logIdent = consumerIdString + " "
 
   connectZk()
   createFetcher()
@@ -109,9 +137,64 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def createMessageStreams[T](topicCountMap: Map[String,Int],
                               decoder: Decoder[T])
       : Map[String,List[KafkaMessageStream[T]]] = {
+    if (messageStreamCreated.getAndSet(true))
+      throw new RuntimeException(this.getClass.getSimpleName +
+                                   " can create message streams at most once")
     consume(topicCountMap, decoder)
   }
 
+  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int, decoder: Decoder[T]) = {
+    if (messageStreamCreated.getAndSet(true))
+      throw new RuntimeException(this.getClass.getSimpleName +
+                                         " can create message streams at most once")
+
+    numWildcardStreams = numStreams
+    wildcardQueues = (1 to numWildcardStreams)
+      .map(_ => new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks))
+    wildcardStreams = wildcardQueues.map(
+      new KafkaMessageAndTopicStream[T](_, config.consumerTimeoutMs, decoder))
+
+    wildcardTopicFilter = new TopicFilter(filterSpec)
+
+    wildcardTopics = getChildrenParentMayNotExist(
+      zkClient, BrokerTopicsPath).filter(wildcardTopicFilter.isTopicAllowed)
+
+    val wildcardTopicCount = TopicCount.constructTopicCount(
+      consumerIdString, wildcardTopicFilter, numWildcardStreams, zkClient)
+
+    val topicStreamsMap =
+      new mutable.HashMap[String,List[KafkaMessageAndTopicStream[T]]]
+
+    loadBalancerListener = new ZKRebalancerListener(
+      config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[kafka.consumer.MessageStream]]])
+
+    val dirs = new ZKGroupDirs(config.groupId)
+    registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
+
+    // Register listener for session expired event.
+    sessionExpirationListener = new ZKSessionExpireListener(
+      dirs, consumerIdString, wildcardTopicCount, loadBalancerListener)
+
+    if (wildcardTopics.nonEmpty)
+      consumeWildCardTopics()
+
+    if (wildcardTopicFilter.requiresTopicEventWatcher) {
+      info("Not creating event watcher for trivial whitelist " + wildcardTopicFilter)
+    }
+    else {
+      info("Creating topic event watcher for whitelist " + wildcardTopicFilter)
+      wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this)
+
+      /*
+       * Topic events will trigger subsequent synced rebalances. Also, the
+       * consumer will get registered only after an allowed topic becomes
+       * available.
+       */
+    }
+
+    wildcardStreams.toSeq.asInstanceOf[Seq[KafkaMessageAndTopicStream[T]]]
+  }
+
   private def createFetcher() {
     if (enableFetcher)
       fetcher = Some(new Fetcher(config, zkClient))
@@ -126,6 +209,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("ZKConsumerConnector shutting down")
+      if (wildcardTopicWatcher != null)
+        wildcardTopicWatcher.shutdown()
       try {
         scheduler.shutdownNow()
         fetcher match {
@@ -158,18 +243,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val dirs = new ZKGroupDirs(config.groupId)
     var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]]
 
-    var consumerUuid : String = null
-    config.consumerId match {
-      case Some(consumerId) // for testing only
-      => consumerUuid = consumerId
-      case None // generate unique consumerId automatically
-      => val uuid = UUID.randomUUID()
-        consumerUuid = "%s-%d-%s".format(
-          InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
-          uuid.getMostSignificantBits().toHexString.substring(0,8))
-    }
-    val consumerIdString = config.groupId + "_" + consumerUuid
-    val topicCount = new TopicCount(consumerIdString, topicCountMap)
+    val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
 
     // create a queue per topic per consumer thread
     val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
@@ -178,19 +252,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       for (threadId <- threadIdSet) {
         val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         queues.put((topic, threadId), stream)
-        streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder)
+        streamList ::= new KafkaMessageStream[T](stream, config.consumerTimeoutMs, decoder)
       }
       ret += (topic -> streamList)
       debug("adding topic " + topic + " and stream to map..")
     }
 
     // listener to consumer and partition changes
-    val loadBalancerListener = new ZKRebalancerListener[T](config.groupId, consumerIdString, ret)
+    loadBalancerListener = new ZKRebalancerListener(
+      config.groupId, consumerIdString,
+      ret.asInstanceOf[mutable.Map[String, List[MessageStream]]])
     registerConsumerInZK(dirs, consumerIdString, topicCount)
 
     // register listener for session expired event
-    zkClient.subscribeStateChanges(
-      new ZKSessionExpireListener[T](dirs, consumerIdString, topicCount, loadBalancerListener))
+    sessionExpirationListener = new ZKSessionExpireListener(
+      dirs, consumerIdString, topicCount, loadBalancerListener)
+    zkClient.subscribeStateChanges(sessionExpirationListener)
 
     zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
 
@@ -205,9 +282,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     ret
   }
 
-  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
-    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+    createEphemeralPathExpectConflict(zkClient,
+                                      dirs.consumerRegistryDir + "/" + consumerIdString,
+                                      topicCount.dbString)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 
@@ -334,10 +413,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     producedOffset
   }
 
-  class ZKSessionExpireListener[T](val dirs: ZKGroupDirs,
+  class ZKSessionExpireListener(val dirs: ZKGroupDirs,
                                  val consumerIdString: String,
                                  val topicCount: TopicCount,
-                                 val loadBalancerListener: ZKRebalancerListener[T])
+                                 val loadBalancerListener: ZKRebalancerListener)
     extends IZkStateListener {
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
@@ -359,10 +438,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
        *  consumer in the consumer registry and trigger a rebalance.
        */
       info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
-      loadBalancerListener.resetState
+      loadBalancerListener.resetState()
       registerConsumerInZK(dirs, consumerIdString, topicCount)
       // explicitly trigger load balancing for this consumer
-      loadBalancerListener.syncedRebalance
+      loadBalancerListener.syncedRebalance()
 
       // There is no need to resubscribe to child and state changes.
       // The child change watchers will be set inside rebalance when we read the children list.
@@ -370,8 +449,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   }
 
-  class ZKRebalancerListener[T](val group: String, val consumerIdString: String,
-                                kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
+  class ZKRebalancerListener(val group: String, val consumerIdString: String,
+                             val kafkaMessageStreams: mutable.Map[String,List[MessageStream]])
     extends IZkChildListener {
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
@@ -468,7 +547,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
 
     private def rebalance(cluster: Cluster): Boolean = {
-      val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
+      val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
 
@@ -539,12 +618,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
 
     private def closeFetchersForQueues(cluster: Cluster,
-                                       kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+                                       messageStreams: Map[String,List[MessageStream]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
         case Some(f) => f.stopConnectionsToAllBrokers
-        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams)
+        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
         info("Committing all offsets after clearing the fetcher queues")
         /**
         * here, we need to commit offsets before stopping the consumer from returning any more messages
@@ -559,16 +638,16 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       }
     }
 
-    private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+    private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[MessageStream]],
                               relevantTopicThreadIdsMap: Map[String, Set[String]]) {
       // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
       // after this rebalancing attempt
       val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
-      closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared)
+      closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared)
     }
 
-    private def updateFetcher[T](cluster: Cluster,
-                                 kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+    private def updateFetcher(cluster: Cluster,
+      messageStreams: Map[String,List[MessageStream]]) {
       // update partitions for fetcher
       var allPartitionInfos : List[PartitionTopicInfo] = Nil
       for (partitionInfos <- topicRegistry.values)
@@ -579,7 +658,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
       fetcher match {
         case Some(f) =>
-          f.startConnections(allPartitionInfos, cluster, kafkaMessageStreams)
+          f.startConnections(allPartitionInfos, cluster)
         case None =>
       }
     }
@@ -651,5 +730,70 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       debug(partTopicInfo + " selected new offset " + offset)
     }
   }
+
+  private def consumeWildCardTopics[T]() {
+    val dirs = new ZKGroupDirs(config.groupId)
+
+    val wildcardTopicCount = TopicCount.constructTopicCount(
+      consumerIdString, wildcardTopicFilter, numWildcardStreams, zkClient)
+
+    val consumerThreadIdsPerTopic = wildcardTopicCount.getConsumerThreadIdsPerTopic
+
+    val topicStreamsMap = loadBalancerListener.kafkaMessageStreams
+
+    // Break up {"topic1" -> Set(thread1, thread2, ...), ... } into
+    // list of (topic, thread-id) pairs
+    implicit def groupTopicThreadIds(v: (String, Set[String])) = v._2.map((v._1, _))
+    val groupedTopicThreadIds = consumerThreadIdsPerTopic.flatten
+
+    val updatedQueueMap =
+      ((for (tt <- groupedTopicThreadIds; q <- wildcardQueues) yield (tt -> q)) toMap)
+
+    updatedQueueMap.foreach(e => {
+      queues.put((e._1._1, e._1._2), e._2)
+      topicStreamsMap += (e._1._1 -> wildcardStreams.toList)
+    })
+
+    zkClient.subscribeStateChanges(sessionExpirationListener)
+
+    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+
+    topicStreamsMap.foreach { topicAndStreams =>
+    // register on broker partition path changes
+      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
+      zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+    }
+
+    loadBalancerListener.syncedRebalance()
+  }
+
+  def handleTopicEvent(allTopics: Seq[String]) {
+    debug("Handling topic event")
+
+    val updatedTopics = allTopics.filter(wildcardTopicFilter.isTopicAllowed)
+
+    val addedTopics = updatedTopics filterNot (wildcardTopics contains)
+    if (addedTopics.nonEmpty)
+      info("Topic event: added topics = %s"
+                           .format(addedTopics))
+
+    /*
+     * Deleted topics are interesting (and will not be a concern until 0.8
+     * release): if topics have been deleted, we do not need to unsubscribe
+     * watching their child changes, since if they come back, we do want to
+     * get notified.
+     */
+    val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
+    if (deletedTopics.nonEmpty)
+      info("Topic event: deleted topics = %s"
+                           .format(deletedTopics))
+
+    wildcardTopics = updatedTopics
+    info("Topics to consume = %s".format(wildcardTopics))
+
+    if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
+      consumeWildCardTopics()
+  }
+
 }
 
diff --git core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index eb563e1..df83baa 100644
--- core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -21,11 +21,9 @@ import scala.collection.JavaConversions._
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.server.KafkaServerStartable
-import kafka.common.ConsumerRebalanceFailedException
 
 class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
-    val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging {
+    val eventHandler: TopicEventHandler[String]) extends Logging {
 
   val lock = new Object()
 
@@ -35,7 +33,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
   startWatchingTopicEvents()
 
   private def startWatchingTopicEvents() {
-    val topicEventListener = new ZkTopicEventListener(kafkaServerStartable)
+    val topicEventListener = new ZkTopicEventListener()
     ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
 
     zkClient.subscribeStateChanges(
@@ -52,6 +50,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
 
   def shutdown() {
     lock.synchronized {
+      info("Shutting down topic event watcher.")
       if (zkClient != null) {
         stopWatchingTopicEvents()
         zkClient.close()
@@ -62,7 +61,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
     }
   }
 
-  class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener {
+  class ZkTopicEventListener extends IZkChildListener {
 
     @throws(classOf[Exception])
     def handleChildChange(parent: String, children: java.util.List[String]) {
@@ -76,11 +75,8 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
           }
         }
         catch {
-          case e: ConsumerRebalanceFailedException =>
-            fatal("can't rebalance in embedded consumer; proceed to shutdown", e)
-            kafkaServerStartable.shutdown()
           case e =>
-            error("error in handling child changes in embedded consumer", e)
+            error("error in handling child changes", e)
         }
       }
     }
diff --git core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index 6f50e97..0bfa6f2 100644
--- core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -17,34 +17,53 @@
 
 package kafka.javaapi.consumer;
 
-import kafka.consumer.KafkaMessageStream;
-import kafka.message.Message;
-import kafka.serializer.Decoder;
 
 import java.util.List;
 import java.util.Map;
+import kafka.consumer.KafkaMessageAndTopicStream;
+import kafka.consumer.KafkaMessageStream;
+import kafka.consumer.TopicFilterSpec;
+import kafka.message.Message;
+import kafka.serializer.Decoder;
 
 public interface ConsumerConnector {
-    /**
-     *  Create a list of MessageStreams of type T for each topic.
-     *
-     *  @param topicCountMap  a map of (topic, #streams) pair
-     *  @param decoder a decoder that converts from Message to T
-     *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
-     *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
-     */
-    public <T> Map<String, List<KafkaMessageStream<T>>> createMessageStreams(
-            Map<String, Integer> topicCountMap, Decoder<T> decoder);
-    public Map<String, List<KafkaMessageStream<Message>>> createMessageStreams(
-            Map<String, Integer> topicCountMap);
+  /**
+   *  Create a list of MessageStreams of type T for each topic.
+   *
+   *  @param topicCountMap  a map of (topic, #streams) pair
+   *  @param decoder a decoder that converts from Message to T
+   *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
+   *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
+   */
+  public <T> Map<String, List<KafkaMessageStream<T>>> createMessageStreams(
+      Map<String, Integer> topicCountMap, Decoder<T> decoder);
+  public Map<String, List<KafkaMessageStream<Message>>> createMessageStreams(
+      Map<String, Integer> topicCountMap);
+
+  /**
+   *  Create a list of MessageAndTopicStreams containing messages of type T.
+   *
+   *  @param filterSpec a TopicFilterSpec that specifies which topics to
+   *                    subscribe to (encapsulates a whitelist or a blacklist).
+   *  @param numStreams the number of message streams to return.
+   *  @param decoder a decoder that converts from Message to T
+   *  @return a list of KafkaMessageAndTopicStream. Each stream supports an
+   *          iterator over its MessageAndTopic elements.
+   */
+  public <T> List<KafkaMessageAndTopicStream<T>> createMessageStreamsByFilter(
+      TopicFilterSpec filterSpec, int numStreams, Decoder<T> decoder);
+  public <T> List<KafkaMessageAndTopicStream<T>> createMessageStreamsByFilter(
+      TopicFilterSpec filterSpec, int numStreams);
+  public <T> List<KafkaMessageAndTopicStream<T>> createMessageStreamsByFilter(
+      TopicFilterSpec filterSpec);
 
-    /**
-     *  Commit the offsets of all broker partitions connected by this connector.
-     */
-    public void commitOffsets();
+  /**
+   *  Commit the offsets of all broker partitions connected by this connector.
+   */
+  public void commitOffsets();
 
-    /**
-     *  Shut down the connector
-     */
-    public void shutdown();
+  /**
+   *  Shut down the connector
+   */
+  public void shutdown();
 }
diff --git core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 0ee7488..4d0f78c 100644
--- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -16,9 +16,10 @@
  */
 package kafka.javaapi.consumer
 
-import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
 import kafka.message.Message
 import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.consumer.{TopicFilterSpec, TopicFilter, KafkaMessageStream, ConsumerConfig}
+
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -88,6 +89,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       : java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] =
     createMessageStreams(topicCountMap, new DefaultDecoder)
 
+  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int, decoder: Decoder[T]) =
+    createMessageStreamsByFilter(filterSpec, numStreams, decoder)
+
+  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int) =
+    createMessageStreamsByFilter(filterSpec, numStreams)
+
+  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec) =
+    createMessageStreamsByFilter(filterSpec)
 
   def commitOffsets() {
     underlying.commitOffsets
diff --git core/src/main/scala/kafka/message/MessageAndTopic.scala core/src/main/scala/kafka/message/MessageAndTopic.scala
new file mode 100644
index 0000000..47a0dc0
--- /dev/null
+++ core/src/main/scala/kafka/message/MessageAndTopic.scala
@@ -0,0 +1,9 @@
+package kafka.message
+
+import kafka.serializer.Decoder
+
+class MessageAndTopic[T](msg: Message, val topic: String, decoder: Decoder[T]) {
+
+  def message = decoder.toEvent(msg)
+
+}
diff --git core/src/main/scala/kafka/server/KafkaServerStartable.scala core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 3e196e8..4ec30fa 100644
--- core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -24,6 +24,7 @@ import kafka.message.Message
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.Map
+import kafka.common.ConsumerRebalanceFailedException
 
 class KafkaServerStartable(val serverConfig: KafkaConfig,
                            val consumerConfig: ConsumerConfig,
@@ -37,6 +38,8 @@ class KafkaServerStartable(val serverConfig: KafkaConfig,
 
   private def init() {
     server = new KafkaServer(serverConfig)
+    // embedded consumer can be deprecated eventually - we should switch to
+    // using MirrorMaker tool
     if (consumerConfig != null)
       embeddedConsumer =
         new EmbeddedConsumer(consumerConfig, producerConfig, this)
@@ -117,7 +120,14 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
 
     if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
       info("mirror topics = %s".format(mirrorTopics))
-      startNewConsumerThreads(makeTopicMap(mirrorTopics))
+      try {
+        startNewConsumerThreads(makeTopicMap(mirrorTopics))
+      }
+      catch {
+        case e: ConsumerRebalanceFailedException =>
+          fatal("can't rebalance; proceed to shutdown", e)
+          kafkaServerStartable.shutdown()
+      }
     }
   }
 
@@ -161,7 +171,7 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
 
   def startup() {
     info("staring up embedded consumer")
-    topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this, kafkaServerStartable)
+    topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this)
     /*
     * consumer threads are (re-)started upon topic events (which includes an
     * initial startup event which lists the current topics)
diff --git core/src/main/scala/kafka/tools/MirrorMaker.scala core/src/main/scala/kafka/tools/MirrorMaker.scala
new file mode 100644
index 0000000..6563e50
--- /dev/null
+++ core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.message.Message
+import joptsimple.OptionParser
+import kafka.utils.{Utils, Logging}
+import kafka.producer.{ProducerData, ProducerConfig, Producer}
+import scala.collection.JavaConversions._
+import java.util.concurrent.CountDownLatch
+import kafka.consumer._
+
+
+object MirrorMaker extends Logging {
+
+  def main(args: Array[String]) {
+    
+    info ("Starting mirror maker")
+    val parser = new OptionParser
+
+    val consumerConfigOpt = parser.accepts("consumer-config",
+      "Consumer config to consume from a source cluster. " +
+      "You may specify multiple of these.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(classOf[String])
+
+    val producerConfigOpt = parser.accepts("producer-config",
+      "Embedded producer config.")
+      .withRequiredArg()
+      .describedAs("config file")
+      .ofType(classOf[String])
+    
+    val numStreamsOpt = parser.accepts("num-streams",
+      "Number of consumption streams.")
+      .withRequiredArg()
+      .describedAs("Number of threads")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+    
+    val whitelistOpt = parser.accepts("whitelist",
+      "Whitelist of topics to mirror.")
+      .withRequiredArg()
+      .describedAs("Java regex (String)")
+      .ofType(classOf[String])
+
+    val blacklistOpt = parser.accepts("blacklist",
+            "Blacklist of topics to mirror.")
+            .withRequiredArg()
+            .describedAs("Java regex (String)")
+            .ofType(classOf[String])
+
+    val helpOpt = parser.accepts("help", "Print this message.")
+
+    val options = parser.parse(args : _*)
+
+    if (options.has(helpOpt)) {
+      parser.printHelpOn(System.out)
+      System.exit(0)
+    }
+
+    Utils.checkRequiredArgs(
+      parser, options, consumerConfigOpt, producerConfigOpt)
+    val topicOrFilterOpt = List(whitelistOpt, blacklistOpt).filter(options.has)
+    if (topicOrFilterOpt.size != 1) {
+      println("Exactly one of whitelist or blacklist is required.")
+      System.exit(1)
+    }
+
+    val numStreams = options.valueOf(numStreamsOpt)
+
+    val producer = {
+      val config = new ProducerConfig(
+        Utils.loadProps(options.valueOf(producerConfigOpt)))
+      new Producer[Null, Message](config)
+    }
+
+    val threads = {
+      val connectors = options.valuesOf(consumerConfigOpt).toList
+              .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
+              .map(new ZookeeperConsumerConnector(_))
+
+      Runtime.getRuntime.addShutdownHook(new Thread() {
+        override def run() {
+          connectors.foreach(_.shutdown())
+        }
+      })
+
+      val filterSpec = if (options.has(whitelistOpt))
+        new Whitelist(options.valueOf(whitelistOpt))
+      else
+        new Blacklist(options.valueOf(blacklistOpt))
+
+      val streams =
+        connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
+
+      streams.flatten.zipWithIndex.map(streamAndIndex => {
+        new MirrorMakerThread(streamAndIndex._1, producer, streamAndIndex._2)
+      })
+    }
+
+    threads.foreach(_.start())
+
+    threads.foreach(_.awaitShutdown())
+  }
+
+  class MirrorMakerThread(stream: KafkaMessageAndTopicStream[Message],
+                          producer: Producer[Null, Message],
+                          threadId: Int)
+          extends Thread with Logging {
+
+    private val shutdownLatch = new CountDownLatch(1)
+    private val threadName = "mirrormaker-" + threadId
+
+    this.setName(threadName)
+
+    override def run() {
+      try {
+        for (msgAndTopic <- stream) {
+          val pd = new ProducerData[Null, Message](
+            msgAndTopic.topic, msgAndTopic.message)
+          producer.send(pd)
+        }
+      }
+      catch {
+        case e =>
+          fatal("%s stream unexpectedly exited.", e)
+      }
+      finally {
+        shutdownLatch.countDown()
+        info("Stopped thread %s.".format(threadName))
+      }
+    }
+
+    def awaitShutdown() {
+      try {
+        shutdownLatch.await()
+      }
+      catch {
+        case e: InterruptedException => fatal(
+          "Shutdown of thread %s interrupted. This might leak data!"
+                  .format(threadName))
+      }
+    }
+  }
+}
+
diff --git core/src/main/scala/kafka/utils/Logging.scala core/src/main/scala/kafka/utils/Logging.scala
index 2e664f5..6e05eb4 100644
--- core/src/main/scala/kafka/utils/Logging.scala
+++ core/src/main/scala/kafka/utils/Logging.scala
@@ -23,72 +23,76 @@ trait Logging {
   val loggerName = this.getClass.getName
   lazy val logger = Logger.getLogger(loggerName)
 
+  protected var logIdent = ""
+  
+  private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
+
   def trace(msg: => String): Unit = {
     if (logger.isTraceEnabled())
-      logger.trace(msg)	
+      logger.trace(msgWithLogIdent(msg))
   }
   def trace(e: => Throwable): Any = {
     if (logger.isTraceEnabled())
-      logger.trace("",e)	
+      logger.trace(logIdent,e)
   }
   def trace(msg: => String, e: => Throwable) = {
     if (logger.isTraceEnabled())
-      logger.trace(msg,e)
+      logger.trace(msgWithLogIdent(msg),e)
   }
 
   def debug(msg: => String): Unit = {
     if (logger.isDebugEnabled())
-      logger.debug(msg)
+      logger.debug(msgWithLogIdent(msg))
   }
   def debug(e: => Throwable): Any = {
     if (logger.isDebugEnabled())
-      logger.debug("",e)	
+      logger.debug(logIdent,e)
   }
   def debug(msg: => String, e: => Throwable) = {
     if (logger.isDebugEnabled())
-      logger.debug(msg,e)
+      logger.debug(msgWithLogIdent(msg),e)
   }
 
   def info(msg: => String): Unit = {
     if (logger.isInfoEnabled())
-      logger.info(msg)
+      logger.info(msgWithLogIdent(msg))
   }
   def info(e: => Throwable): Any = {
     if (logger.isInfoEnabled())
-      logger.info("",e)
+      logger.info(logIdent,e)
   }
   def info(msg: => String,e: => Throwable) = {
     if (logger.isInfoEnabled())
-      logger.info(msg,e)
+      logger.info(msgWithLogIdent(msg),e)
   }
 
   def warn(msg: => String): Unit = {
-    logger.warn(msg)
+    logger.warn(msgWithLogIdent(msg))
   }
   def warn(e: => Throwable): Any = {
-    logger.warn("",e)
+    logger.warn(logIdent,e)
   }
   def warn(msg: => String, e: => Throwable) = {
-    logger.warn(msg,e)
+    logger.warn(msgWithLogIdent(msg),e)
   }	
 
   def error(msg: => String): Unit = {
-    logger.error(msg)
+    logger.error(msgWithLogIdent(msg))
   }		
   def error(e: => Throwable): Any = {
-    logger.error("",e)
+    logger.error(logIdent,e)
   }
   def error(msg: => String, e: => Throwable) = {
-    logger.error(msg,e)
+    logger.error(msgWithLogIdent(msg),e)
   }
 
   def fatal(msg: => String): Unit = {
-    logger.fatal(msg)
+    logger.fatal(msgWithLogIdent(msg))
   }
   def fatal(e: => Throwable): Any = {
-    logger.fatal("",e)
+    logger.fatal(logIdent,e)
   }	
   def fatal(msg: => String, e: => Throwable) = {
-    logger.fatal(msg,e)
+    logger.fatal(msgWithLogIdent(msg),e)
   }
 }
diff --git core/src/main/scala/kafka/utils/Utils.scala core/src/main/scala/kafka/utils/Utils.scala
index 7b8b5ae..6085838 100644
--- core/src/main/scala/kafka/utils/Utils.scala
+++ core/src/main/scala/kafka/utils/Utils.scala
@@ -29,12 +29,14 @@ import scala.collection._
 import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
+import joptsimple.{OptionSpec, OptionSet, OptionParser}
+import java.util.regex.Pattern
+
 
 /**
  * Helper functions!
  */
 object Utils extends Logging {
-  
   /**
    * Wrap the given function in a java.lang.Runnable
    * @param fun A function
@@ -657,6 +659,16 @@ object Utils extends Logging {
       case _ => // swallow
     }
   }
+
+  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
+    for(arg <- required) {
+      if(!options.has(arg)) {
+        error("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+  }
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala
index 917da0f..caddb06 100644
--- core/src/main/scala/kafka/utils/ZkUtils.scala
+++ core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -243,17 +243,11 @@ object ZkUtils extends Logging {
     getChildren(zkClient, dirs.consumerRegistryDir)
   }
 
-  def getTopicCount(zkClient: ZkClient, group: String, consumerId: String) : TopicCount = {
-    val dirs = new ZKGroupDirs(group)
-    val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
-    TopicCount.constructTopicCount(consumerId, topicCountJson)
-  }
-
   def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
     val dirs = new ZKGroupDirs(group)
     val consumersInGroup = getConsumersInGroup(zkClient, group)
     val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
-      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)))
+      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient))
     consumersInGroup.zip(topicCountMaps).toMap
   }
 
@@ -262,8 +256,8 @@ object ZkUtils extends Logging {
     val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
     val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
     for (consumer <- consumers) {
-      val topicCount = getTopicCount(zkClient, group, consumer)
-      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
+      val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient)
+      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
         for (consumerThreadId <- consumerThreadIdSet)
           consumersPerTopicMap.get(topic) match {
             case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
diff --git core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
deleted file mode 100644
index 1813444..0000000
--- core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.consumer
-
-import junit.framework.Assert._
-import org.junit.Test
-import org.scalatest.junit.JUnitSuite
-import kafka.cluster.Partition
-
-
-class TopicCountTest extends JUnitSuite {
-
-  @Test
-  def testBasic() {
-    val consumer = "conusmer1"
-    val json = """{ "topic1" : 2, "topic2" : 3 }"""
-    val topicCount = TopicCount.constructTopicCount(consumer, json)
-    val topicCountMap = Map(
-      "topic1" -> 2,
-      "topic2" -> 3
-      )
-    val expectedTopicCount = new TopicCount(consumer, topicCountMap)
-    assertTrue(expectedTopicCount == topicCount)
-
-    val topicCount2 = TopicCount.constructTopicCount(consumer, expectedTopicCount.toJsonString)
-    assertTrue(expectedTopicCount == topicCount2)
-  }
-
-  @Test
-  def testPartition() {
-    assertTrue(new Partition(10, 0) == new Partition(10, 0))
-    assertTrue(new Partition(10, 1) != new Partition(10, 0))
-  }
-}
diff --git core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
new file mode 100644
index 0000000..e3812e5
--- /dev/null
+++ core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -0,0 +1,38 @@
+package kafka.consumer
+
+
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+
+class TopicFilterTest extends JUnitSuite {
+
+  @Test
+  def testWhitelists() {
+
+    val whitelist1 = new Whitelist("white1,white2")
+    val topicFilter1 = new TopicFilter(whitelist1)
+    assertTrue(topicFilter1.requiresTopicEventWatcher)
+    assertTrue(topicFilter1.isTopicAllowed("white2"))
+    assertFalse(topicFilter1.isTopicAllowed("black1"))
+
+    val whitelist2 = new Whitelist(".+")
+    val topicFilter2 = new TopicFilter(whitelist2)
+    assertFalse(topicFilter2.requiresTopicEventWatcher)
+    assertTrue(topicFilter2.isTopicAllowed("alltopics"))
+    
+    val whitelist3 = new Whitelist("white_listed-topic.+")
+    val topicFilter3 = new TopicFilter(whitelist3)
+    assertFalse(topicFilter3.requiresTopicEventWatcher)
+    assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1"))
+    assertFalse(topicFilter3.isTopicAllowed("black1"))
+  }
+
+  @Test
+  def testBlacklists() {
+    val blacklist1 = new Blacklist("black1")
+    val topicFilter1 = new TopicFilter(blacklist1)
+    assertFalse(topicFilter1.requiresTopicEventWatcher)
+  }
+}
\ No newline at end of file
diff --git core/src/test/scala/unit/kafka/integration/FetcherTest.scala core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 40b7ff4..915af85 100644
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -56,7 +56,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
     super.setUp
     fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
     fetcher.stopConnectionsToAllBrokers
-    fetcher.startConnections(topicInfos, cluster, null)
+    fetcher.startConnections(topicInfos, cluster)
   }
 
   override def tearDown() {
diff --git core/src/test/scala/unit/kafka/utils/UtilsTest.scala core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 7e75f8f..4e7c26e 100644
--- core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -20,6 +20,7 @@ package kafka.utils
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
+import junit.framework.Assert._
 
 class UtilsTest extends JUnitSuite {
   
@@ -29,5 +30,5 @@ class UtilsTest extends JUnitSuite {
   def testSwallow() {
     Utils.swallow(logger.info, throw new IllegalStateException("test"))
   }
-  
+
 }
diff --git examples/src/main/java/kafka/examples/Consumer.java examples/src/main/java/kafka/examples/Consumer.java
index 18b7348..f78f36c 100644
--- examples/src/main/java/kafka/examples/Consumer.java
+++ examples/src/main/java/kafka/examples/Consumer.java
@@ -16,16 +16,17 @@
  */
 package kafka.examples;
 
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaMessageStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.Message;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 
 public class Consumer extends Thread
 {
diff --git system_test/embedded_consumer/bin/run-test.sh system_test/embedded_consumer/bin/run-test.sh
index e11fe27..36a3daa 100755
--- system_test/embedded_consumer/bin/run-test.sh
+++ system_test/embedded_consumer/bin/run-test.sh
@@ -14,8 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-readonly num_messages=400000
-readonly message_size=400
+readonly num_messages=4000
+readonly message_size=100
 readonly action_on_fail="proceed"
 
 readonly test_start_time="$(date +%s)"
@@ -125,7 +125,7 @@ shutdown_servers() {
 start_producer() {
     topic=$1
     info "start producing messages for topic $topic ..."
-    $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
+    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
     pid_producer=$!
 }
 
diff --git system_test/mirror_maker/README system_test/mirror_maker/README
new file mode 100644
index 0000000..da53c14
--- /dev/null
+++ system_test/mirror_maker/README
@@ -0,0 +1,22 @@
+This test replicates messages from two source kafka clusters into one target
+kafka cluster using the mirror-maker tool.  At the end, the messages produced
+at the source brokers should match that at the target brokers.
+
+To run this test, do
+bin/run-test.sh
+
+In the event of failure, by default the brokers and zookeepers remain running
+to make it easier to debug the issue - hit Ctrl-C to shut them down. You can
+change this behavior by setting the action_on_fail flag in the script to "exit"
+or "proceed", in which case a snapshot of all the logs and directories is
+placed in the test's base directory.
+
+It is a good idea to run the test in a loop. E.g.:
+
+:>/tmp/mirrormaker_test.log
+for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/mirrormaker_test.log; done
+tail -F /tmp/mirrormaker_test.log
+
+grep -ic passed /tmp/mirrormaker_test.log
+grep -ic failed /tmp/mirrormaker_test.log
+
diff --git system_test/mirror_maker/bin/expected.out system_test/mirror_maker/bin/expected.out
new file mode 100644
index 0000000..0a1bbaf
--- /dev/null
+++ system_test/mirror_maker/bin/expected.out
@@ -0,0 +1,18 @@
+start the servers ...
+start producing messages ...
+wait for consumer to finish consuming ...
+[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool)
+thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec
+[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool)
+[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+Total Num Messages: 400000 bytes: 79859641 in 22.93 secs
+Messages/sec: 17444.3960
+MB/sec: 3.3214
+test passed
+stopping the servers
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
diff --git system_test/mirror_maker/bin/run-test.sh system_test/mirror_maker/bin/run-test.sh
new file mode 100755
index 0000000..a71237e
--- /dev/null
+++ system_test/mirror_maker/bin/run-test.sh
@@ -0,0 +1,357 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+readonly num_messages=10000
+readonly message_size=100
+readonly action_on_fail="proceed"
+# readonly action_on_fail="wait"
+
+readonly test_start_time="$(date +%s)"
+
+readonly base_dir=$(dirname $0)/..
+
+info() {
+    echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
+}
+
+kill_child_processes() {
+    isTopmost=$1
+    curPid=$2
+    childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
+    for childPid in $childPids
+    do
+        kill_child_processes 0 $childPid
+    done
+    if [ $isTopmost -eq 0 ]; then
+        kill -15 $curPid 2> /dev/null
+    fi
+}
+
+cleanup() {
+    info "cleaning up"
+
+    pid_zk_source1=
+    pid_zk_source2=
+    pid_zk_target=
+    pid_kafka_source_1_1=
+    pid_kafka_source_1_2=
+    pid_kafka_source_2_1=
+    pid_kafka_source_2_2=
+    pid_kafka_target_1_1=
+    pid_kafka_target_1_2=
+    pid_producer=
+    pid_mirrormaker_1=
+    pid_mirrormaker_2=
+
+    rm -rf /tmp/zookeeper*
+
+    rm -rf /tmp/kafka*
+}
+
+begin_timer() {
+    t_begin=$(date +%s)
+}
+
+end_timer() {
+    t_end=$(date +%s)
+}
+
+start_zk() {
+    info "starting zookeepers"
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_1.properties 2>&1 > $base_dir/zookeeper_source-1.log &
+    pid_zk_source1=$!
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_2.properties 2>&1 > $base_dir/zookeeper_source-2.log &
+    pid_zk_source2=$!
+    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
+    pid_zk_target=$!
+}
+
+start_source_servers() {
+    info "starting source cluster"
+
+    JMX_PORT=1111 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_1.properties 2>&1 > $base_dir/kafka_source-1-1.log &
+    pid_kafka_source_1_1=$!
+    JMX_PORT=2222 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_2.properties 2>&1 > $base_dir/kafka_source-1-2.log &
+    pid_kafka_source_1_2=$!
+    JMX_PORT=3333 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_1.properties 2>&1 > $base_dir/kafka_source-2-1.log &
+    pid_kafka_source_2_1=$!
+    JMX_PORT=4444 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_2.properties 2>&1 > $base_dir/kafka_source-2-2.log &
+    pid_kafka_source_2_2=$!
+}
+
+start_target_servers() {
+    info "starting mirror cluster"
+    JMX_PORT=5555 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_1.properties 2>&1 > $base_dir/kafka_target-1-1.log &
+    pid_kafka_target_1_1=$!
+    JMX_PORT=6666 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_2.properties 2>&1 > $base_dir/kafka_target-1-2.log &
+    pid_kafka_target_1_2=$!
+}
+
+shutdown_servers() {
+    info "stopping mirror-maker"
+    if [ "x${pid_mirrormaker_1}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_1}; fi
+    # sleep to avoid rebalancing during shutdown
+    sleep 2
+    if [ "x${pid_mirrormaker_2}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_2}; fi
+
+    info "stopping producer"
+    if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
+
+    info "shutting down target servers"
+    if [ "x${pid_kafka_target_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_1}; fi
+    if [ "x${pid_kafka_target_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_2}; fi
+    sleep 2
+
+    info "shutting down source servers"
+    if [ "x${pid_kafka_source_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_1}; fi
+    if [ "x${pid_kafka_source_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_2}; fi
+    if [ "x${pid_kafka_source_2_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_1}; fi
+    if [ "x${pid_kafka_source_2_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_2}; fi
+
+    info "shutting down zookeeper servers"
+    if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
+    if [ "x${pid_zk_source1}" != "x" ]; then kill_child_processes 0 ${pid_zk_source1}; fi
+    if [ "x${pid_zk_source2}" != "x" ]; then kill_child_processes 0 ${pid_zk_source2}; fi
+}
+
+start_producer() {
+    topic=$1
+    zk=$2
+    info "start producing messages for topic $topic to zookeeper $zk ..."
+    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
+    pid_producer=$!
+}
+
+# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
+wait_partition_done() {
+    n_tuples=$(($# / 3))
+
+    i=1
+    while (($#)); do
+        kafka_server[i]=$1
+        topic[i]=$2
+        partitionid[i]=$3
+        prev_offset[i]=0
+        info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
+        i=$((i+1))
+        shift 3
+    done
+
+    all_done=0
+
+    # set -x
+    while [[ $all_done != 1 ]]; do
+        sleep 4
+        i=$n_tuples
+        all_done=1
+        for ((i=1; i <= $n_tuples; i++)); do
+            cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
+            if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
+                all_done=0
+                prev_offset[i]=$cur_size
+            fi
+        done
+    done
+
+}
+
+cmp_logs() {
+    topic=$1
+    info "comparing source and target logs for topic $topic"
+    source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    source_part3_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9095 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
+    if [ "x$source_part0_size" == "x" ]; then source_part0_size=0; fi
+    if [ "x$source_part1_size" == "x" ]; then source_part1_size=0; fi
+    if [ "x$source_part2_size" == "x" ]; then source_part2_size=0; fi
+    if [ "x$source_part3_size" == "x" ]; then source_part3_size=0; fi
+    if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
+    if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
+    expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size + $source_part3_size))
+    actual_size=$(($target_part0_size + $target_part1_size))
+    if [ "x$expected_size" != "x$actual_size" ]
+    then
+        info "source size: $expected_size target size: $actual_size"
+        return 1
+    else
+        return 0
+    fi
+}
+
+take_fail_snapshot() {
+    snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
+    mkdir $snapshot_dir
+    for dir in /tmp/zookeeper_source{1..2} /tmp/zookeeper_target /tmp/kafka-source-{1..2}-{1..2}-logs /tmp/kafka-target{1..2}-logs; do
+        if [ -d $dir ]; then
+            cp -r $dir $snapshot_dir
+        fi
+    done
+}
+
+# Usage: process_test_result <result> <action_on_fail>
+# result: last test result
+# action_on_fail: (exit|wait|proceed)
+# ("wait" is useful if you want to troubleshoot using zookeeper)
+process_test_result() {
+    result=$1
+    if [ $1 -eq 0 ]; then
+        info "test passed"
+    else
+        info "test failed"
+        case "$2" in
+            "wait") info "waiting: hit Ctrl-c to quit"
+                wait
+                ;;
+            "exit") shutdown_servers
+                take_fail_snapshot
+                exit $result
+                ;;
+            *) shutdown_servers
+                take_fail_snapshot
+                info "proceeding"
+                ;;
+        esac
+    fi
+}
+
+test_whitelists() {
+    info "### Testing whitelists"
+    snapshot_prefix="whitelist-test"
+
+    cleanup
+    start_zk
+    start_source_servers
+    start_target_servers
+    sleep 4
+
+    info "starting mirror makers"
+    JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 4 2>&1 > $base_dir/kafka_mirrormaker_1.log &
+    pid_mirrormaker_1=$!
+    JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/whitelisttest_1.consumer.properties --consumer-config $base_dir/config/whitelisttest_2.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num-streams 4 2>&1 > $base_dir/kafka_mirrormaker_2.log &
+    pid_mirrormaker_2=$!
+
+    begin_timer
+
+    start_producer whitetopic01 localhost:2181
+    start_producer whitetopic01 localhost:2182
+    info "waiting for whitetopic01 producers to finish producing ..."
+    wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0 kafka://localhost:9092 whitetopic01 0 kafka://localhost:9093 whitetopic01 0
+
+    start_producer whitetopic02 localhost:2181
+    start_producer whitetopic03 localhost:2181
+    start_producer whitetopic04 localhost:2182
+    info "waiting for whitetopic02,whitetopic03,whitetopic04 producers to finish producing ..."
+    wait_partition_done kafka://localhost:9090 whitetopic02 0 kafka://localhost:9091 whitetopic02 0 kafka://localhost:9090 whitetopic03 0 kafka://localhost:9091 whitetopic03 0 kafka://localhost:9092 whitetopic04 0 kafka://localhost:9093 whitetopic04 0
+
+    start_producer blacktopic01 localhost:2182
+    info "waiting for blacktopic01 producer to finish producing ..."
+    wait_partition_done kafka://localhost:9092 blacktopic01 0 kafka://localhost:9093 blacktopic01 0
+
+    info "waiting for consumer to finish consuming ..."
+
+    wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0 kafka://localhost:9094 whitetopic02 0 kafka://localhost:9095 whitetopic02 0 kafka://localhost:9094 whitetopic03 0 kafka://localhost:9095 whitetopic03 0 kafka://localhost:9094 whitetopic04 0 kafka://localhost:9095 whitetopic04 0
+
+    end_timer
+    info "embedded consumer took $((t_end - t_begin)) seconds"
+
+    sleep 2
+
+    # if [[ -d /tmp/kafka-target-1-1-logs/blacktopic01 || /tmp/kafka-target-1-2-logs/blacktopic01 ]]; then
+    #     echo "blacktopic01 found on target cluster"
+    #     result=1
+    # else
+    #     cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
+    #     result=$?
+    # fi
+
+    cmp_logs blacktopic01
+
+    cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
+    result=$?
+
+    return $result
+}
+
+test_blacklists() {
+    info "### Testing blacklists"
+    snapshot_prefix="blacklist-test"
+    cleanup
+    start_zk
+    start_source_servers
+    start_target_servers
+    sleep 4
+
+    info "starting mirror maker"
+    $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer-config $base_dir/config/blacklisttest.consumer.properties --producer-config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num-streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
+    pid_mirrormaker_1=$!
+
+    start_producer blacktopic01 localhost:2181
+    start_producer blacktopic02 localhost:2181
+    info "waiting for producer to finish producing blacktopic01,blacktopic02 ..."
+    wait_partition_done kafka://localhost:9090 blacktopic01 0 kafka://localhost:9091 blacktopic01 0 kafka://localhost:9090 blacktopic02 0 kafka://localhost:9091 blacktopic02 0
+
+    begin_timer
+
+    start_producer whitetopic01 localhost:2181
+    info "waiting for producer to finish producing whitetopic01 ..."
+    wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0
+
+    info "waiting for consumer to finish consuming ..."
+    wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0
+
+    end_timer
+
+    info "embedded consumer took $((t_end - t_begin)) seconds"
+
+    sleep 2
+
+    cmp_logs blacktopic01 || cmp_logs blacktopic02
+    if [ $? -eq 0 ]; then
+        return 1
+    fi
+    
+    cmp_logs whitetopic01
+    return $?
+}
+
+# main test begins
+
+echo "Test-$test_start_time"
+
+# Ctrl-c trap. Catches INT signal
+trap "shutdown_servers; exit 0" INT
+
+test_whitelists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
+ 
+sleep 2
+ 
+test_blacklists
+result=$?
+
+process_test_result $result $action_on_fail
+
+shutdown_servers
+
+exit $result
+
diff --git system_test/mirror_maker/config/blacklisttest.consumer.properties system_test/mirror_maker/config/blacklisttest.consumer.properties
new file mode 100644
index 0000000..2c4c283
--- /dev/null
+++ system_test/mirror_maker/config/blacklisttest.consumer.properties
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
diff --git system_test/mirror_maker/config/mirror_producer.properties system_test/mirror_maker/config/mirror_producer.properties
new file mode 100644
index 0000000..5940c24
--- /dev/null
+++ system_test/mirror_maker/config/mirror_producer.properties
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+# broker.list=1:localhost:9094,2:localhost:9095
+
+# timeout in ms for connecting to zookeeper
+# zk.connectiontimeout.ms=1000000
+
+producer.type=async
+
+# to avoid dropping events if the queue is full, wait indefinitely
+queue.enqueueTimeout.ms=-1
+
diff --git system_test/mirror_maker/config/server_source_1_1.properties system_test/mirror_maker/config/server_source_1_1.properties
new file mode 100644
index 0000000..d89c4fb
--- /dev/null
+++ system_test/mirror_maker/config/server_source_1_1.properties
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9090
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-1-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=10000000
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
diff --git system_test/mirror_maker/config/server_source_1_2.properties system_test/mirror_maker/config/server_source_1_2.properties
new file mode 100644
index 0000000..063d68b
--- /dev/null
+++ system_test/mirror_maker/config/server_source_1_2.properties
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9091
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-1-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
diff --git system_test/mirror_maker/config/server_source_2_1.properties system_test/mirror_maker/config/server_source_2_1.properties
new file mode 100644
index 0000000..998b460
--- /dev/null
+++ system_test/mirror_maker/config/server_source_2_1.properties
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-2-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
diff --git system_test/mirror_maker/config/server_source_2_2.properties system_test/mirror_maker/config/server_source_2_2.properties
new file mode 100644
index 0000000..81427ae
--- /dev/null
+++ system_test/mirror_maker/config/server_source_2_2.properties
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9093
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source-2-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
diff --git system_test/mirror_maker/config/server_target_1_1.properties system_test/mirror_maker/config/server_target_1_1.properties
new file mode 100644
index 0000000..0265f4e
--- /dev/null
+++ system_test/mirror_maker/config/server_target_1_1.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9094
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target-1-1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
diff --git system_test/mirror_maker/config/server_target_1_2.properties system_test/mirror_maker/config/server_target_1_2.properties
new file mode 100644
index 0000000..a31e9ca
--- /dev/null
+++ system_test/mirror_maker/config/server_target_1_2.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9095
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target-1-2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2183
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
diff --git system_test/mirror_maker/config/whitelisttest_1.consumer.properties system_test/mirror_maker/config/whitelisttest_1.consumer.properties
new file mode 100644
index 0000000..2c4c283
--- /dev/null
+++ system_test/mirror_maker/config/whitelisttest_1.consumer.properties
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
diff --git system_test/mirror_maker/config/whitelisttest_2.consumer.properties system_test/mirror_maker/config/whitelisttest_2.consumer.properties
new file mode 100644
index 0000000..6cbcdb4
--- /dev/null
+++ system_test/mirror_maker/config/whitelisttest_2.consumer.properties
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
diff --git system_test/mirror_maker/config/zookeeper_source_1.properties system_test/mirror_maker/config/zookeeper_source_1.properties
new file mode 100644
index 0000000..f851796
--- /dev/null
+++ system_test/mirror_maker/config/zookeeper_source_1.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source-1
+# the port at which the clients will connect
+clientPort=2181
diff --git system_test/mirror_maker/config/zookeeper_source_2.properties system_test/mirror_maker/config/zookeeper_source_2.properties
new file mode 100644
index 0000000..d534d18
--- /dev/null
+++ system_test/mirror_maker/config/zookeeper_source_2.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source-2
+# the port at which the clients will connect
+clientPort=2182
diff --git system_test/mirror_maker/config/zookeeper_target.properties system_test/mirror_maker/config/zookeeper_target.properties
new file mode 100644
index 0000000..55a7eb1
--- /dev/null
+++ system_test/mirror_maker/config/zookeeper_target.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_target
+# the port at which the clients will connect
+clientPort=2183
