diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 713c7c9..1135f5d 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -22,6 +22,7 @@ import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
 import kafka.common.TopicAndPartition
+import kafka.common.ErrorMapping
 
 
 class ConsumerFetcherThread(name: String,
@@ -57,7 +58,11 @@ class ConsumerFetcherThread(name: String,
       case _ => startTimestamp = OffsetRequest.LatestTime
     }
     val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
-    val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+    val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+    val newOffset = partitionErrorAndOffset.error match {
+      case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+    }
     val pti = partitionMap(topicAndPartition)
     pti.resetFetchOffset(newOffset)
     pti.resetConsumeOffset(newOffset)
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 6003cab..0b85865 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -23,7 +23,6 @@ import kafka.message._
 import kafka.utils.Logging
 
 class PartitionTopicInfo(val topic: String,
-                         val brokerId: Int,
                          val partitionId: Int,
                          private val chunkQueue: BlockingQueue[FetchedDataChunk],
                          private val consumedOffset: AtomicLong,
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 42a9628..bfeee26 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -402,18 +402,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val brokers = getAllBrokersInCluster(zkClient)
       val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
-      val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{
         val topic = m.topic
         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
         partitionsPerTopicMap.put(topic, partitions)
-        m.partitionsMetadata.foreach(pmd =>{
-          val partitionId = pmd.partitionId
-          val leaderOpt = pmd.leader
-          if(leaderOpt.isDefined)
-            leaderIdForPartitionsMap.put((topic, partitionId), leaderOpt.get.id)
-        })
       })
+
       /**
        * fetchers must be stopped to avoid data duplication, since if the current
        * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
@@ -456,7 +450,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             for (i <- startPart until startPart + nParts) {
               val partition = curPartitions(i)
               info(consumerThreadId + " attempting to claim partition " + partition)
-              addPartitionTopicInfo(currentTopicRegistry, leaderIdForPartitionsMap, topicDirs, partition, topic, consumerThreadId)
+              addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
               // record the partition ownership decision
               partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
             }
@@ -573,39 +567,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
 
     private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
-                                      leaderIdForPartitionsMap: Map[(String, Int), Int],
                                       topicDirs: ZKGroupTopicDirs, partition: Int,
                                       topic: String, consumerThreadId: String) {
       val partTopicInfoMap = currentTopicRegistry.get(topic)
 
-      // find the leader for this partition
-      val leaderOpt = leaderIdForPartitionsMap.get((topic, partition))
-      leaderOpt match {
-        case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s".
-          format(partition, topic))
-        case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l))
-      }
-      val leader = leaderOpt.get
-
       val znode = topicDirs.consumerOffsetDir + "/" + partition
       val offsetString = readDataMaybeNull(zkClient, znode)._1
-      // If first time starting a consumer, set the initial offset based on the config
+      // If first time starting a consumer, set the initial offset to -1
       val offset =
         offsetString match {
           case Some(offsetStr) => offsetStr.toLong
-          case None =>
-            config.autoOffsetReset match {
-              case OffsetRequest.SmallestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
-              case OffsetRequest.LargestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
-            }
+          case None => -1
         }
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
       val partTopicInfo = new PartitionTopicInfo(topic,
-                                                 leader,
                                                  partition,
                                                  queue,
                                                  consumedOffset,
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index bdb1d03..178a9b1 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -150,7 +150,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
     partitionMapLock.lock()
     try {
-      partitionMap.put(TopicAndPartition(topic, partitionId), initialOffset)
+      val topicPartition = TopicAndPartition(topic, partitionId)
+      partitionMap.put(
+          topicPartition,
+          if(initialOffset < 0) handleOffsetOutOfRange(topicPartition) else initialOffset)
       partitionMapCond.signalAll()
     } finally {
       partitionMapLock.unlock()
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 0b5363f..8ae30ea 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -50,7 +50,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                           c.brokerId,
                                                            0,
                                                            queue,
                                                            new AtomicLong(consumedOffset),
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e12f5a7..f7ee914 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -312,7 +312,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     assertEquals(topic, topicRegistry.map(r => r._1).head)
     val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
     val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
-    assertEquals(0, brokerPartition.brokerId)
     assertEquals(0, brokerPartition.partitionId)
 
     // also check partition ownership
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 61d9fc9..5a57bd1 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -44,7 +44,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                           c.brokerId,
                                                            0,
                                                            queue,
                                                            new AtomicLong(0),
