diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e3944d5..fe57433 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -397,6 +397,30 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
     }
 
+    def syncedPartialRebalance(topic : String) {
+      rebalanceLock synchronized {
+        info("begin rebalancing consumer " + consumerIdString + " for a single topic " + topic + " with only one trial")
+        var done = false
+        val cluster = getCluster(zkClient)
+        try {
+          done = rebalancePartial(cluster, topic)
+        } catch {
+          case e =>
+            info("exception during partial rebalance. Trigger the full rebelance attempt", e)
+            syncedRebalance
+        }
+        info("end rebalancing consumer " + consumerIdString + " for a single topic " + topic)
+        if (done) {
+          return
+        } else {
+          /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
+           * clear the cache */
+          info("Partial rebalancing attempt failed. Trigger the full rebelance attempt")
+          syncedRebalance
+        }
+      }
+    }
+
     private def rebalance(cluster: Cluster): Boolean = {
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
@@ -489,6 +513,92 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       }
     }
 
+    private def rebalancePartial(cluster: Cluster, topic: String): Boolean = {
+      // First check if a full rebalance is on the way, if yes, returns immediately
+      if (isWatcherTriggered) return true
+
+      val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
+      val myTopicThreadIds = myTopicThreadIdsMap.get(topic).get
+      val consumersForTopic = getConsumersPerTopic(zkClient, group).get(topic).get
+
+      if (myTopicThreadIds.size == 0 || consumersForTopic.size == 0) {
+        // If the consumers for the new topic cannot be constructed, then fail directly
+        return false
+      }
+
+      val brokers = getAllBrokersInCluster(zkClient)
+      if (brokers.size == 0) {
+        // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
+        // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
+        // are up.
+        warn("no brokers found when trying to rebalance.")
+        zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
+        true
+      }
+      else {
+        var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]()
+        val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+
+        val topicMetadata = ClientUtils.fetchTopicMetadata(Set(topic),
+          brokers,
+          config.clientId,
+          config.socketTimeoutMs,
+          correlationId.getAndIncrement).topicsMetadata
+
+        val partitionsForTopic = topicMetadata.head
+        val partitions: Seq[Int] = partitionsForTopic.partitionsMetadata.map(m => m.partitionId)
+
+        currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
+
+        val topicDirs = new ZKGroupTopicDirs(group, topic)
+        val nPartsPerConsumer = partitions.size / consumersForTopic.size
+        val nConsumersWithExtraPart = partitions.size % consumersForTopic.size
+
+        info("Consumer " + consumerIdString + " rebalancing the following partitions: " + partitions +
+          " for topic " + topic + " with consumers: " + consumersForTopic)
+
+        for (consumerThreadId <- myTopicThreadIds) {
+          val myConsumerPosition = consumersForTopic.findIndexOf(_ == consumerThreadId)
+          assert(myConsumerPosition >= 0)
+          val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
+          val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
+
+          /**
+           *   Range-partition the sorted partitions to consumers for better locality.
+           *  The first few consumers pick up an extra partition, if any.
+           */
+          if (nParts <= 0)
+            warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
+          else {
+            for (i <- startPart until startPart + nParts) {
+              val partition = partitions(i)
+              info(consumerThreadId + " attempting to claim partition " + partition)
+              addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
+              // record the partition ownership decision
+              partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
+            }
+          }
+        }
+
+        /**
+         * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
+         * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
+         */
+        if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
+          info("Updating the cache")
+          debug("Partitions for topic " + topic + "\'s cache " + partitionsForTopic)
+          debug("Consumers for topic " + topic + "\'s cache " + consumersForTopic)
+          currentTopicRegistry.foreach(m => topicRegistry.put(m._1, m._2))
+          // first stop fecthers then update it
+          closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
+          updateFetcher(cluster)
+          true
+        } else {
+          false
+        }
+      }
+    }
+
     private def closeFetchersForQueues(cluster: Cluster,
                                        messageStreams: Map[String,List[KafkaStream[_,_]]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
@@ -761,12 +871,56 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       wildcardTopics = updatedTopics
       info("Topics to consume = %s".format(wildcardTopics))
 
-      if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
+      // If it is just one addition of a new topic, only do a partial rebalance of this topic;
+      // Otherwise do a full rebalance
+      if (addedTopics.size == 1 && !deletedTopics.nonEmpty)
+        handleAddNewTopicForFilteredStreams(addedTopics.head)
+      else if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
         reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
     }
 
+    def handleAddNewTopicForFilteredStreams(topic : String) {
+      debug("Handling adding a new topic %s for wildcard filtered streams".format(topic))
+
+      // update topicThreadIdAndQueues and topicStreamsMap
+      val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
+
+      val newConsumerIds = new mutable.HashSet[String]
+      for (i <- 0 until wildcardQueuesAndStreams.size)
+        newConsumerIds += consumerIdString + "-" + i
+
+      require(newConsumerIds.size == wildcardQueuesAndStreams.size,
+        "Mismatch between thread ID count (%d) and queue count (%d) for the new topic %s"
+          .format(newConsumerIds.size, wildcardQueuesAndStreams.size, topic))
+      val newThreadQueueStreamPairs = newConsumerIds.zip(wildcardQueuesAndStreams)
+
+      newThreadQueueStreamPairs.foreach(e => {
+        val topicThreadId = (topic, e._1)
+        val q = e._2._1
+        topicThreadIdAndQueues.put(topicThreadId, q)
+        debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
+        newGauge(
+          config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+          new Gauge[Int] {
+            def value = q.size
+          }
+        )
+      })
+
+      val streams = wildcardQueuesAndStreams.map(_._2).toList
+      topicStreamsMap += (topic -> streams)
+      debug("adding topic %s and %d streams to map.".format(topic, streams.size))
+
+      // register on broker partition path changes of the new topic
+      zkClient.subscribeChildChanges(BrokerTopicsPath + "/" + topic, loadBalancerListener)
+
+      // explicitly trigger partial load balancing for this topic
+      loadBalancerListener.syncedPartialRebalance(topic)
+    }
+
     def streams: Seq[KafkaStream[K,V]] =
       wildcardQueuesAndStreams.map(_._2)
   }
+
 }
 
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index fcfc583..5a0f652 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -41,6 +41,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
+  val topic2 = "topic2"
+  val topicFilter = new Whitelist("topic.")
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
     yield new KafkaConfig(props) {
@@ -151,6 +153,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
+
+
     val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
     assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
 
@@ -165,6 +169,83 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
+  def testWildcard() {
+    val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    // send some messages for topic1 to each broker
+    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
+      sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+
+    // wait to make sure the topic and partition have a leader for the successful case
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
+
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1)) {
+      override val consumerTimeoutMs = 200
+    }
+
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreamsByFilter(topicFilter, 1, new StringDecoder(), new StringDecoder())
+
+    val topicMessageStreamsMap1: mutable.Map[String,List[KafkaStream[String, String]]] = new mutable.HashMap[String,List[KafkaStream[String, String]]]
+    topicMessageStreamsMap1.put(topic, topicMessageStreams1.asInstanceOf[List[KafkaStream[String, String]]])
+
+    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreamsMap1)
+
+    println(receivedMessages1)
+
+    assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
+
+    // send some messages to for topic2 to each broker
+    val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic2, 0, nMessages) ++
+      sendMessagesToBrokerPartition(configs.last, topic2, 1, nMessages)
+
+    val topicMessageStreamsMap2: mutable.Map[String,List[KafkaStream[String, String]]] = new mutable.HashMap[String,List[KafkaStream[String, String]]]
+    topicMessageStreamsMap2.put(topic2, topicMessageStreams1.asInstanceOf[List[KafkaStream[String, String]]])
+
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
+
+    val receivedMessages2 = getMessages(nMessages*2, topicMessageStreamsMap2)
+
+    println(receivedMessages2)
+
+    assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
+
+    // send some messages to for a non-watched topic to each broker
+    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, "none", 0, nMessages) ++
+      sendMessagesToBrokerPartition(configs.last, "none", 1, nMessages)
+
+    val topicMessageStreamsMap3: mutable.Map[String,List[KafkaStream[String, String]]] = new mutable.HashMap[String,List[KafkaStream[String, String]]]
+    topicMessageStreamsMap3.put("none", topicMessageStreams1.asInstanceOf[List[KafkaStream[String, String]]])
+
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
+
+    // no messages to consume, we should hit timeout;
+    try {
+      getMessages(nMessages*2, topicMessageStreamsMap3)
+      fail("should get an exception")
+    } catch {
+      case e: ConsumerTimeoutException => // this is ok
+      case e => throw e
+    }
+
+    //val receivedMessages3 = getMessages(nMessages*2, topicMessageStreamsMap3)
+    //println(receivedMessages3)
+    //assertTrue(receivedMessages3.isEmpty)
+
+    zkConsumerConnector1.shutdown
+    info("all consumer connectors stopped")
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
   def testCompression() {
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
@@ -344,7 +425,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
                                     partition: Int,
                                     numMessages: Int,
                                     compression: CompressionCodec = NoCompressionCodec): List[String] = {
-    val header = "test-%d-%d".format(config.brokerId, partition)
+    val header = "test-%s-".format(topic)
     val props = new Properties()
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
