diff --git a/config/server.properties b/config/server.properties
index e92f599..35c7410 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -120,4 +120,4 @@ kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
 kafka.csv.metrics.dir=/tmp/kafka_metrics
 # Disable csv reporting by default.
 kafka.csv.metrics.reporter.enabled=false
-
+default.replication.factor=2
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 9378357..c64a97c 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -59,11 +59,11 @@ object ConsoleConsumer extends Logging {
             .describedAs("gid")
             .defaultsTo("console-consumer-" + new Random().nextInt(100000))
             .ofType(classOf[String])
-    val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
+    val maxMemOpt = parser.accepts("max-memory", "The maximum amount of memory the consumer can use for fetches.")
             .withRequiredArg
             .describedAs("size")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(1024 * 1024)
+            .ofType(classOf[java.lang.Long])
+            .defaultsTo(ConsumerConfig.MaxMemory)
     val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.")
             .withRequiredArg
             .describedAs("bytes")
@@ -146,7 +146,7 @@ object ConsoleConsumer extends Logging {
     val props = new Properties()
     props.put("groupid", options.valueOf(groupIdOpt))
     props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
-    props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
+    props.put("max.memory", options.valueOf(maxMemOpt).toString)
     props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
     props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
     props.put("autocommit.enable", "true")
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 660a977..bd6c94b 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -19,17 +19,16 @@ package kafka.consumer
 
 import java.util.Properties
 import kafka.api.OffsetRequest
-import kafka.utils.{VerifiableProperties, ZKConfig}
+import kafka.utils.{DeprecatedProperty, VerifiableProperties, ZKConfig}
 
 object ConsumerConfig {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
-  val FetchSize = 1024 * 1024
-  val MaxFetchSize = 10*FetchSize
+  val MaxMemory = 100 * 1024 * 1024L
   val DefaultFetcherBackoffMs = 1000
   val AutoCommit = true
   val AutoCommitInterval = 10 * 1000
-  val MaxQueuedChunks = 10
+  val MaxQueuedChunks = 1
   val MaxRebalanceRetries = 4
   val AutoOffsetReset = OffsetRequest.SmallestTimeString
   val ConsumerTimeoutMs = -1
@@ -43,13 +42,17 @@ object ConsumerConfig {
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
   val DefaultClientId = ""
+
+  private val deprecatedProperties = Seq(
+    DeprecatedProperty(deprecated = "fetch.size", insteadOpt = Some("max.memory"))
+  )
 }
 
 class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
   import ConsumerConfig._
 
   def this(originalProps: Properties) {
-    this(new VerifiableProperties(originalProps))
+    this(new VerifiableProperties(originalProps, Some(ConsumerConfig.deprecatedProperties)))
     props.verify()
   }
 
@@ -66,9 +69,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   /** the socket receive buffer for network requests */
   val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize)
   
-  /** the number of byes of messages to attempt to fetch */
-  val fetchSize = props.getInt("fetch.size", FetchSize)
-  
+  /** The maximum amount of memory that this consumer can use toward fetch requests. */
+  val maxMemory = props.getLong("max.memory", props.getLong("fetch.size", MaxMemory))
+
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
   val autoCommit = props.getBoolean("autocommit.enable", AutoCommit)
   
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index a6cbfb6..9c2ce20 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -88,11 +88,22 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   }
   leaderFinderThread.start()
 
-
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ConsumerFetcherThread(
-      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
-      config, sourceBroker, partitionMap, this)
+      name = "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+      config,
+      sourceBroker,
+      partitionMap,
+      partitionFetchSize =
+        () => {
+          val requested = (config.maxMemory / (partitionCount * (1/* current fetch */ + config.maxQueuedChunks)).max(1))
+                          .toInt
+          if (requested < 0)
+            Int.MaxValue
+          else
+            requested
+        },
+      consumerFetcherManager = this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
@@ -141,4 +152,4 @@ class ConsumerFetcherManager(private val consumerIdString: String,
     stopAllConnections()
     info("shutdown completed")
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index c902e20..9530552 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -28,13 +28,14 @@ class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
                             sourceBroker: Broker,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
+                            partitionFetchSize: () => Int,
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
                                       clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
                                       socketBufferSize = config.socketBufferSize, 
-                                      fetchSize = config.fetchSize,
+                                      partitionFetchSize,
                                       fetcherBrokerId = Request.OrdinaryConsumerId,
                                       maxWait = config.maxFetchWaitMs,
                                       minBytes = config.minFetchBytes) {
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 8c42d11..f2ac086 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -28,7 +28,6 @@ class PartitionTopicInfo(val topic: String,
                          private val chunkQueue: BlockingQueue[FetchedDataChunk],
                          private val consumedOffset: AtomicLong,
                          private val fetchedOffset: AtomicLong,
-                         private val fetchSize: AtomicInteger,
                          private val consumerTopicStats: ConsumerTopicStats) extends Logging {
 
   debug("initial consumer offset of " + this + " is " + consumedOffset.get)
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 43e9fa6..b7f55ed 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -83,7 +83,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   ClientId.validate(config.clientId)
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
-  private var fetcher: Option[ConsumerFetcherManager] = None
+  private var fetcherManager: Option[ConsumerFetcherManager] = None
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
@@ -146,7 +146,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private def createFetcher() {
     if (enableFetcher)
-      fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
+      fetcherManager = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
   }
 
   private def connectZk() {
@@ -164,8 +164,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       try {
         if (config.autoCommit)
           scheduler.shutdownNow()
-        fetcher match {
-          case Some(f) => f.shutdown
+        fetcherManager match {
+          case Some(fm) => fm.shutdown
           case None =>
         }
         sendShutdownToAllQueues()
@@ -486,9 +486,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                        messageStreams: Map[String,List[KafkaStream[_,_]]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
-      fetcher match {
-        case Some(f) =>
-          f.stopAllConnections
+      fetcherManager match {
+        case Some(fm) =>
+          fm.stopAllConnections
           clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
           info("Committing all offsets after clearing the fetcher queues")
           /**
@@ -537,9 +537,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       info("Consumer " + consumerIdString + " selected partitions : " +
         allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
 
-      fetcher match {
-        case Some(f) =>
-          f.startConnections(allPartitionInfos, cluster)
+      fetcherManager match {
+        case Some(fm) =>
+          fm.startConnections(allPartitionInfos, cluster)
         case None =>
       }
     }
@@ -614,7 +614,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                  queue,
                                                  consumedOffset,
                                                  fetchedOffset,
-                                                 new AtomicInteger(config.fetchSize),
                                                  consumerTopicStats)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index e8702e2..869d3bd 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -26,6 +26,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
   private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
+  @volatile protected var partitionCount = 0
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers
@@ -34,6 +35,11 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
   // to be defined in subclass to create a specific fetcher
   def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
 
+  private def updatePartitionCount() {
+    partitionCount = fetcherThreadMap.values.map(_.partitionCount()).sum
+    debug("Updated partition count to " + partitionCount)
+  }
+
   def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) {
     mapLock synchronized {
       var fetcherThread: AbstractFetcherThread = null
@@ -48,6 +54,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
       fetcherThread.addPartition(topic, partitionId, initialOffset)
       info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d"
           .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId))
+      updatePartitionCount()
     }
   }
 
@@ -57,6 +64,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
       for ((key, fetcher) <- fetcherThreadMap) {
         fetcher.removePartition(topic, partitionId)
       }
+      updatePartitionCount()
     }
   }
 
@@ -70,6 +78,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
         }
       }
       fetcherThreadMap --= keysToBeRemoved
+      updatePartitionCount()
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 6d73c82..0ce8be3 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -23,7 +23,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.message.MessageAndOffset
-import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
+import kafka.api.{FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
@@ -35,8 +35,9 @@ import java.util.concurrent.locks.ReentrantLock
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
-abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
-                                     fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
+abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int,
+                                      socketBufferSize: Int, partitionFetchSize: () => Int,
+                                      fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
   private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
   private val partitionMapLock = new ReentrantLock
@@ -64,85 +65,141 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
     simpleConsumer.close()
   }
 
-  override def doWork() {
-    val fetchRequestuilder = new FetchRequestBuilder().
+  private case class TopicPartitionAndOffset(topic: String, partition: Int, offset: Long)
+  private case class FetchResult(errorPartitions: Set[TopicAndPartition],
+                                 incompletePartitions: Set[TopicPartitionAndOffset])
+
+  private def fetch(partitions: Set[TopicPartitionAndOffset],
+                    partitionFetchSize: Int,
+                    isRetry: Boolean): FetchResult = {
+    val fetchRequestBuilder = new FetchRequestBuilder().
             clientId(clientId + "-" + brokerInfo).
             replicaId(fetcherBrokerId).
             maxWait(maxWait).
             minBytes(minBytes)
 
-    partitionMapLock.lock()
+    partitions.foreach(tpo => fetchRequestBuilder.addFetch(tpo.topic, tpo.partition, tpo.offset, partitionFetchSize))
+    val fetchRequest = fetchRequestBuilder.build()
+
     try {
-      while (partitionMap.isEmpty)
-        partitionMapCond.await()
-      partitionMap.foreach {
-        case((topicAndPartition, offset)) =>
-          fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
-                           offset, fetchSize)
+      trace("issuing fetch request to broker %d : %s".format(sourceBroker.id, fetchRequest))
+      fetcherMetrics.requestRate.mark()
+      val response = simpleConsumer.fetch(fetchRequest)
+      // process fetched data
+      partitionMapLock.lock()
+      try {
+        /*
+         * Go through each partition in the response object.
+         * - for non-error/complete partitions, process the data.
+         * - for error and incomplete incomplete partitions, fold them into a
+         *   single FetchResult.
+         */
+        response.data.foldLeft(FetchResult(Set.empty, Set.empty))((foldedFetchResults, curr) => {
+          val topicAndPartition = curr._1
+          val partitionData = curr._2
+          val (topic, partitionId) = topicAndPartition.asTuple
+          val currentOffset = partitionMap.get(topicAndPartition)
+          if (currentOffset.isDefined) {
+            partitionData.error match {
+              case ErrorMapping.NoError =>
+                val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+                val validBytes = messages.validBytes
+                val newOffset = messages.lastOption match {
+                  case Some(m: MessageAndOffset) => m.nextOffset
+                  case None => currentOffset.get
+                }
+                partitionMap.put(topicAndPartition, newOffset)
+                fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
+                fetcherMetrics.byteRate.mark(validBytes)
+
+                if (validBytes > 0 || isRetry /* so ConsumerIterator can detect this */)
+                  // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
+                  processPartitionData(topicAndPartition, currentOffset.get, partitionData)
+
+                if (validBytes == 0)
+                  FetchResult(foldedFetchResults.errorPartitions,
+                              foldedFetchResults.incompletePartitions +
+                                      TopicPartitionAndOffset(topic, partitionId, currentOffset.get))
+                else
+                  foldedFetchResults
+              case ErrorMapping.OffsetOutOfRangeCode =>
+                val newOffset = handleOffsetOutOfRange(topicAndPartition)
+                partitionMap.put(topicAndPartition, newOffset)
+                warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
+                             .format(currentOffset.get, topic, partitionId, newOffset))
+                foldedFetchResults
+              case _ =>
+                warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
+                      ErrorMapping.exceptionFor(partitionData.error))
+                FetchResult(foldedFetchResults.errorPartitions + topicAndPartition,
+                            foldedFetchResults.incompletePartitions)
+            }
+          } else {
+            foldedFetchResults
+          }
+        })
+      }
+      finally {
+        partitionMapLock.unlock()
       }
-    } finally {
-      partitionMapLock.unlock()
     }
-
-    val fetchRequest = fetchRequestuilder.build()
-    val partitionsWithError = new mutable.HashSet[TopicAndPartition]
-    var response: FetchResponse = null
-    try {
-      trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
-      response = simpleConsumer.fetch(fetchRequest)
-    } catch {
-      case t =>
+    catch {
+      case t: Throwable =>
         debug("error in fetch %s".format(fetchRequest), t)
         if (isRunning.get) {
-          partitionMapLock synchronized {
-            partitionsWithError ++= partitionMap.keys
-          }
+          FetchResult(partitions.map(e => TopicAndPartition(e.topic, e.partition)), Set.empty)
+        } else {
+          FetchResult(Set.empty, Set.empty)
         }
+
     }
-    fetcherMetrics.requestRate.mark()
 
-    if (response != null) {
-      // process fetched data
-      partitionMapLock.lock()
+  }
+
+  override def doWork() {
+    partitionMapLock.lock()
+    val (partitionsToFetch: Set[TopicPartitionAndOffset], initialFetchSize, retryFetchSize) =
       try {
-        response.data.foreach {
-          case(topicAndPartition, partitionData) =>
-            val (topic, partitionId) = topicAndPartition.asTuple
-            val currentOffset = partitionMap.get(topicAndPartition)
-            if (currentOffset.isDefined) {
-              partitionData.error match {
-                case ErrorMapping.NoError =>
-                  val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
-                  val validBytes = messages.validBytes
-                  val newOffset = messages.lastOption match {
-                    case Some(m: MessageAndOffset) => m.nextOffset
-                    case None => currentOffset.get
-                  }
-                  partitionMap.put(topicAndPartition, newOffset)
-                  fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
-                  fetcherMetrics.byteRate.mark(validBytes)
-                  // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                  processPartitionData(topicAndPartition, currentOffset.get, partitionData)
-                case ErrorMapping.OffsetOutOfRangeCode =>
-                  val newOffset = handleOffsetOutOfRange(topicAndPartition)
-                  partitionMap.put(topicAndPartition, newOffset)
-                  warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
-                    .format(currentOffset.get, topic, partitionId, newOffset))
-                case _ =>
-                  warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
-                    ErrorMapping.exceptionFor(partitionData.error))
-                  partitionsWithError += topicAndPartition
-              }
-            }
+        while (partitionMap.isEmpty)
+          partitionMapCond.await()
+        val partitions = partitionMap.iterator.map {
+          case (topicAndPartition, offset) =>
+            TopicPartitionAndOffset(topicAndPartition.topic, topicAndPartition.partition, offset)
+        }.toSet
+        val currPartitionFetchSize = partitionFetchSize()
+        val aggregateFetchSize = {
+          val requested = currPartitionFetchSize * partitionCount()
+          if (requested < 0)
+            Int.MaxValue
+          else
+            requested
         }
-      } finally {
-        partitionMapLock.unlock()
+        (partitions, currPartitionFetchSize, aggregateFetchSize)
       }
+    finally {
+      partitionMapLock.unlock()
     }
 
-    if(partitionsWithError.size > 0) {
-      debug("handling partitions with error for %s".format(partitionsWithError))
-      handlePartitionsWithErrors(partitionsWithError)
+    val FetchResult(errorPartitions1, incompletePartitions1) =
+      fetch(partitionsToFetch, initialFetchSize, isRetry = false)
+
+    val FetchResult(errorPartitions2, incompletePartitions2) = if (incompletePartitions1.nonEmpty) {
+      logger.debug("Retrying fetch serially with fetch size %d for incomplete partitions %s."
+                          .format(retryFetchSize, incompletePartitions1))
+      incompletePartitions1.foldLeft(FetchResult(Set.empty, Set.empty))((foldedFetchResults, curr) => {
+        val thisFetchResult = fetch(Set(curr), retryFetchSize, isRetry = true)
+        FetchResult(foldedFetchResults.errorPartitions ++ thisFetchResult.errorPartitions,
+                    foldedFetchResults.incompletePartitions ++ thisFetchResult.incompletePartitions)
+      })
+    } else
+      FetchResult(Set.empty, Set.empty)
+    if (incompletePartitions2.nonEmpty)
+      logger.debug("After retrying fetch, the following partitions are still incomplete: " + incompletePartitions2)
+
+    val allErrorPartitions = errorPartitions1 ++ errorPartitions2
+    if(allErrorPartitions.nonEmpty) {
+      debug("handling partitions with error for %s".format(allErrorPartitions))
+      handlePartitionsWithErrors(allErrorPartitions)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3488908..846404c 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -21,7 +21,13 @@ import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
+import kafka.utils.{DeprecatedProperty, VerifiableProperties, ZKConfig, Utils}
+
+object KafkaConfig {
+  private val deprecatedProperties = Seq(
+    DeprecatedProperty(deprecated = "replica.fetch.size", insteadOpt = Some("replica.fetchers.max.memory"))
+  )
+}
 
 /**
  * Configuration settings for the kafka server
@@ -29,13 +35,13 @@ import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
 class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
 
   def this(originalProps: Properties) {
-    this(new VerifiableProperties(originalProps))
+    this(new VerifiableProperties(originalProps, Some(KafkaConfig.deprecatedProperties)))
   }
 
   def verify() = props.verify()
   
   /*********** General Configuration ***********/
-  
+
   /* the broker id for this server */
   val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
 
@@ -146,8 +152,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the socket receive buffer for network requests */
   val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
 
-  /* the number of byes of messages to attempt to fetch */
-  val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
+  /** The maximum amount of memory that the replica fetchers can use toward fetch requests. */
+  val replicaMaxMem =
+    props.getLong("replica.fetchers.max.memory", props.getLong("replica.fetch.size", ConsumerConfig.MaxMemory))
 
   /* max wait time for each fetcher request issued by follower replicas*/
   val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 9f696dd..390873f 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -23,7 +23,18 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
-    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
+    new ReplicaFetcherThread(name = "ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId),
+                             sourceBroker,
+                             brokerConfig,
+                             partitionFetchSize =
+                                () => {
+                                  val requested = (brokerConfig.replicaMaxMem / partitionCount.max(1)).toInt
+                                  if (requested < 0)
+                                    Int.MaxValue
+                                  else
+                                    requested
+                                },
+                             replicaMgr)
   }
 
   def shutdown() {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c1d3235..f94343c 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -26,13 +26,14 @@ import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, Fetch
 class ReplicaFetcherThread(name:String,
                            sourceBroker: Broker,
                            brokerConfig: KafkaConfig,
+                           partitionFetchSize: () => Int,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
                                 clientId = FetchRequest.ReplicaFetcherClientId,
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketBufferSize,
-                                fetchSize = brokerConfig.replicaFetchSize,
+                                partitionFetchSize,
                                 fetcherBrokerId = brokerConfig.brokerId,
                                 maxWait = brokerConfig.replicaMaxWaitTimeMs,
                                 minBytes = brokerConfig.replicaMinBytes) {
@@ -43,6 +44,9 @@ class ReplicaFetcherThread(name:String,
     val partitionId = topicAndPartition.partition
     val replica = replicaMgr.getReplica(topic, partitionId).get
     val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+    if (messageSet.sizeInBytes > 0 && messageSet.validBytes == 0)
+      warn("Follower %d stuck for partition %s due to insufficient fetch size (%d)."
+           .format(replica.brokerId, topicAndPartition.toString, brokerConfig.replicaMaxMem))
 
     if (fetchOffset != replica.logEndOffset)
       throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset))
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index db14c82..20ba1b8 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -46,7 +46,7 @@ object ReplayLogProducer extends Logging {
     consumerProps.put("zk.connect", config.zkConnect)
     consumerProps.put("consumer.timeout.ms", "10000")
     consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString)
-    consumerProps.put("fetch.size", (1024*1024).toString)
+    consumerProps.put("max.memory", (1024*1024).toString)
     consumerProps.put("socket.buffer.size", (2 * 1024 * 1024).toString)
     val consumerConfig = new ConsumerConfig(consumerProps)
     val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index d694ba9..fe6108d 100644
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -20,7 +20,20 @@ package kafka.utils
 import java.util.Properties
 import scala.collection._
 
-class VerifiableProperties(val props: Properties) extends Logging {
+
+/**
+ * Convenience class to warn users of deprecated properties.
+ * @param deprecated The deprecated property.
+ * @param insteadOpt (optional) The new property that users should switch to.
+ */
+case class DeprecatedProperty(deprecated: String, insteadOpt: Option[String] = None) {
+  override def toString = "The %s property has been deprecated.%s"
+                          .format(deprecated,
+                                  if (insteadOpt.isDefined) " (Use %s instead.)".format(insteadOpt.get) else "")
+}
+
+class VerifiableProperties(val props: Properties, val deprecationsOpt: Option[Seq[DeprecatedProperty]] = None)
+        extends Logging {
   private val referenceSet = mutable.HashSet[String]()
   
   def this() = this(new Properties)
@@ -32,7 +45,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
   def getProperty(name: String): String = {
     val value = props.getProperty(name)
     referenceSet.add(name)
-    return value
+    value
   }
 
   /**
@@ -40,7 +53,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
    */
   def getInt(name: String): Int = {
     require(containsKey(name), "Missing required property '" + name + "'")
-    return getInt(name, -1)
+    getInt(name, -1)
   }
 
   def getIntInRange(name: String, range: (Int, Int)): Int = {
@@ -181,6 +194,9 @@ class VerifiableProperties(val props: Properties) extends Logging {
     val specifiedProperties = props.propertyNames()
     while (specifiedProperties.hasMoreElements) {
       val key = specifiedProperties.nextElement().asInstanceOf[String]
+      deprecationsOpt.flatMap(deprecations => deprecations.find(_.deprecated == key)).foreach(dp => {
+        warn(dp.toString)
+      })
       if (!referenceSet.contains(key))
         warn("Property %s is not valid".format(key))
       else
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 246b1ec..856a3c3 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -24,46 +24,23 @@ import scala.collection._
 import junit.framework.Assert._
 
 import kafka.message._
-import kafka.server._
-import kafka.utils.TestUtils._
 import kafka.utils._
-import kafka.admin.CreateTopicCommand
 import org.junit.Test
 import kafka.serializer._
-import kafka.cluster.{Broker, Cluster}
 import org.scalatest.junit.JUnit3Suite
-import kafka.integration.KafkaServerTestHarness
 
-class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
-
-  val numNodes = 1
-  val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
-    yield new KafkaConfig(props) {
-      override val zkConnect = TestZKUtils.zookeeperConnect
-    }
+class ConsumerIteratorTest extends JUnit3Suite {
   val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"
-  val group = "group1"
-  val consumer0 = "consumer0"
   val consumedOffset = 5
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
-  val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                           c.brokerId,
-                                                           0,
-                                                           queue,
-                                                           new AtomicLong(consumedOffset),
-                                                           new AtomicLong(0),
-                                                           new AtomicInteger(0),
-                                                           new ConsumerTopicStats("")))
-  val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
-
-  override def setUp() {
-    super.setUp
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
-  }
+  val topicInfo = new PartitionTopicInfo(topic,
+                                         brokerId = 0,
+                                         partitionId = 0,
+                                         queue,
+                                         new AtomicLong(consumedOffset),
+                                         new AtomicLong(0),
+                                         new ConsumerTopicStats(""))
 
   @Test
   def testConsumerIteratorDeduplicationDeepIterator() {
@@ -71,12 +48,12 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
     val messages = messageStrings.map(s => new Message(s.getBytes))
     val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new AtomicLong(0), messages:_*)
 
-    topicInfos(0).enqueue(messageSet)
+    topicInfo.enqueue(messageSet)
     assertEquals(1, queue.size)
     queue.put(ZookeeperConsumerConnector.shutdownCommand)
 
     val iter = new ConsumerIterator[String, String](queue, 
-                                                    consumerConfig.consumerTimeoutMs,
+                                                    ConsumerConfig.ConsumerTimeoutMs,
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
                                                     enableShallowIterator = false,
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 021f419..f9b921c 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -49,7 +49,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
                                                            queue,
                                                            new AtomicLong(0),
                                                            new AtomicLong(0),
-                                                           new AtomicInteger(0),
                                                            new ConsumerTopicStats("")))
 
   var fetcher: ConsumerFetcherManager = null
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index d0e3590..7271062 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -23,15 +23,19 @@ import kafka.utils.TestUtils._
 import kafka.producer.KeyedMessage
 import kafka.serializer.StringEncoder
 import kafka.admin.CreateTopicCommand
-import kafka.utils.TestUtils
+import kafka.utils.{Logging, TestUtils}
 import junit.framework.Assert._
 
-class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
+class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   val props = createBrokerConfigs(2)
-  val configs = props.map(p => new KafkaConfig(p))
+  val configs = props.map(p => new KafkaConfig(p) {
+    override val flushInterval = 1
+    override val replicaMaxMem = 100.toLong
+  })
   var brokers: Seq[KafkaServer] = null
   val topic1 = "foo"
   val topic2 = "bar"
+  val topic3 = "longfoobar"
 
   override def setUp() {
     super.setUp()
@@ -47,24 +51,27 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
     val partition = 0
     val testMessageList1 = List("test1", "test2", "test3", "test4")
     val testMessageList2 = List("test5", "test6", "test7", "test8")
+    val testMessageList3 = List("averylongmessage")
 
     // create a topic and partition and await leadership
-    for (topic <- List(topic1,topic2)) {
+    for (topic <- List(topic1, topic2, topic3)) {
       CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     }
 
     // send test messages to leader
-    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), 
+    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
                                                             new StringEncoder(), 
                                                             new StringEncoder())
-    val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))
+    val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++
+                   testMessageList2.map(m => new KeyedMessage(topic2, m, m)) ++
+                   testMessageList3.map(m => new KeyedMessage(topic3, m, m))
     producer.send(messages:_*)
     producer.close()
 
     def logsMatch(): Boolean = {
       var result = true
-      for (topic <- List(topic1, topic2)) {
+      for (topic <- List(topic1, topic2, topic3)) {
         val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset
         result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total &&
           (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) }
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index a720ced..5773e88 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -42,9 +42,9 @@ object ConsumerPerformance {
 
     if(!config.hideHeader) {
       if(!config.showDetailedStats)
-        println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+        println("start.time, end.time, max.memory, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
       else
-        println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+        println("time, max.memory, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
     }
 
     // clean up zookeeper state for this group id for every perf run
@@ -74,7 +74,7 @@ object ConsumerPerformance {
     if(!config.showDetailedStats) {
       val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024)
       println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
-        config.consumerConfig.fetchSize, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get,
+        config.consumerConfig.maxMemory, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get,
         totalMessagesRead.get/elapsedSecs))
     }
     System.exit(0)
@@ -95,7 +95,7 @@ object ConsumerPerformance {
                            .describedAs("gid")
                            .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
                            .ofType(classOf[String])
-    val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
+    val maxMemOpt = parser.accepts("max-memory", "The maximum amount of memory the consumer can use for fetches.")
                            .withRequiredArg
                            .describedAs("size")
                            .ofType(classOf[java.lang.Integer])
@@ -126,7 +126,7 @@ object ConsumerPerformance {
     val props = new Properties
     props.put("groupid", options.valueOf(groupIdOpt))
     props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString)
-    props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
+    props.put("max.memory", options.valueOf(maxMemOpt).toString)
     props.put("autooffset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
     props.put("zk.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", "5000")
@@ -190,7 +190,7 @@ object ConsumerPerformance {
       val totalMBRead = (bytesRead*1.0)/(1024*1024)
       val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024)
       println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
-        config.consumerConfig.fetchSize, totalMBRead,
+        config.consumerConfig.maxMemory, totalMBRead,
         1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0))
     }
 
diff --git a/system_test/testcase_to_run.json b/system_test/testcase_to_run.json
index 8252860..c6cf17e 100644
--- a/system_test/testcase_to_run.json
+++ b/system_test/testcase_to_run.json
@@ -1,5 +1,5 @@
 {
     "ReplicaBasicTest"   : [
-        "testcase_0001"
+        "testcase_1"
     ]
 }
