From 5aec503aaf030d73a905c75334bced38332864c4 Mon Sep 17 00:00:00 2001
From: Tejas Patil <tejas.patil.cs@gmail.com>
Date: Mon, 19 Aug 2013 13:54:23 -0700
Subject: [PATCH] KAFKA-1012 Initial patch with embedded producer inside consumer

---
 config/server.properties                           |   14 +
 .../main/scala/kafka/api/OffsetFetchRequest.scala  |    4 +-
 .../src/main/scala/kafka/common/ErrorMapping.scala |    1 +
 .../kafka/common/OffsetMetadataAndError.scala      |   25 +-
 .../consumer/ZookeeperConsumerConnector.scala      |  140 ++++++--
 .../scala/kafka/producer/DefaultPartitioner.scala  |    3 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  167 +++++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala |   14 +
 core/src/main/scala/kafka/server/KafkaServer.scala |   12 +-
 .../main/scala/kafka/server/OffsetManager.scala    |  359 ++++++++++++++++++++
 .../main/scala/kafka/server/ReplicaManager.scala   |   14 +-
 .../api/RequestResponseSerializationTest.scala     |    5 +-
 .../scala/unit/kafka/server/OffsetCommitTest.scala |    5 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    9 +-
 14 files changed, 664 insertions(+), 108 deletions(-)
 create mode 100644 core/src/main/scala/kafka/server/OffsetManager.scala

diff --git a/config/server.properties b/config/server.properties
index 7685879..332b1c8 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -107,3 +107,17 @@ kafka.csv.metrics.dir=/tmp/kafka_metrics
 kafka.csv.metrics.reporter.enabled=false
 
 log.cleanup.policy=delete
+
+############################# Consumer Offset Management  #############################
+
+# The storage used for storing consumer offsets. Valid values are: "zookeeper" and "kafka"
+offset.storage=kafka
+
+# Kafkas' inbuilt offset management creates a special topic to which all the consumers write 
+# offsets to. These parameters are for this special offsets topic.
+offsets.topic.replication.factor=1
+offsets.topic.num.partitions=1
+
+# We recommend keeping the offsets topic segment bytes low and enabling the log cleaner to
+# facilitate faster offset loads and reduce the on-disk footprint. Default value is 10 MB
+offsets.topic.segment.bytes=10485760
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index a4c5623..89670f5 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -24,6 +24,8 @@ import kafka.utils.Logging
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+import kafka.server.OffsetAndMetadata
+
 object OffsetFetchRequest extends Logging {
   val CurrentVersion: Short = 0
   val DefaultClientId = ""
@@ -91,7 +93,7 @@ case class OffsetFetchRequest(groupId: String,
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val responseMap = requestInfo.map {
       case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
-        offset=OffsetMetadataAndError.InvalidOffset,
+        offset=OffsetAndMetadata.InvalidOffset,
         error=ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
       ))
     }.toMap
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 153bc0b..bdc52f3 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -42,6 +42,7 @@ object ErrorMapping {
   val MessageSizeTooLargeCode: Short = 10
   val StaleControllerEpochCode: Short = 11
   val OffsetMetadataTooLargeCode: Short = 12
+  val OffsetsLoadingCode: Short = 13
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 59608a3..f7a75cb 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -1,5 +1,3 @@
-package kafka.common
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,21 +14,30 @@ package kafka.common
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package kafka.common
+
+import kafka.server.OffsetAndMetadata
 
 /**
- * Convenience case class since (topic, partition) pairs are ubiquitous.
+ * A convenience case class that consolidates (offset, metadata, error) which is used in OffsetFetchResponse and OffsetCommitRequest
+ * (although the 'error' field is only relevant for OffsetFetchResponse)
  */
-case class OffsetMetadataAndError(offset: Long, metadata: String = OffsetMetadataAndError.NoMetadata, error: Short = ErrorMapping.NoError) {
+case class OffsetMetadataAndError(offset: Long,
+                                  metadata: String = OffsetAndMetadata.NoMetadata,
+                                  error: Short = ErrorMapping.NoError) {
 
   def this(tuple: (Long, String, Short)) = this(tuple._1, tuple._2, tuple._3)
+  def this(offsetMetadata: OffsetAndMetadata, error: Short) = this(offsetMetadata.offset, offsetMetadata.metadata, error)
+  def this(error: Short) = this(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, error)
 
   def asTuple = (offset, metadata, error)
 
-  override def toString = "OffsetAndMetadata[%d,%s,%d]".format(offset, metadata, error)
-
+  override def toString = "OffsetMetadataAndError[%d,%s,%d]".format(offset, metadata, error)
 }
 
 object OffsetMetadataAndError {
-  val InvalidOffset: Long = -1L;
-  val NoMetadata: String = "";
-}
+  val NoError = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError)
+  val OffsetLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadingCode)
+  val BrokerNotAvailableCode = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.BrokerNotAvailableCode)
+  val UnknownTopicPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e3a6420..eff914f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -27,7 +27,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import java.net.InetAddress
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import java.util.UUID
+import java.util.{Properties, UUID}
 import kafka.serializer._
 import kafka.utils.ZkUtils._
 import kafka.common._
@@ -35,6 +35,11 @@ import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
 import kafka.metrics._
 import scala.Some
+import kafka.network.BlockingChannel
+import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
+import util.Random
+import kafka.server.{GroupTopicPartition, OffsetManager}
+import kafka.api.{OffsetFetchResponse, OffsetFetchRequest}
 
 
 /**
@@ -83,7 +88,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[ConsumerFetcherManager] = None
-  private var zkClient: ZkClient = null
+  private val zkClient: ZkClient = connectZk()
+  private var offsetFetchChannel: BlockingChannel = createOffsetFetchChannel()
+  private var offsetFetchChannelLock = new Object
+  private var offsetCommitProducer: Producer[String, String] = createOffsetCommitProducer()
+  private var offsetCommitProducerLock = new Object
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
@@ -109,7 +118,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
   this.logIdent = "[" + consumerIdString + "], "
 
-  connectZk()
   createFetcher()
   if (config.autoCommitEnable) {
     scheduler.startup
@@ -149,9 +157,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
   }
 
-  private def connectZk() {
+  private def connectZk() : ZkClient = {
     info("Connecting to zookeeper instance at " + config.zkConnect)
-    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+    new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
   }
 
   def shutdown() {
@@ -171,10 +179,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         sendShutdownToAllQueues()
         if (config.autoCommitEnable)
           commitOffsets()
-        if (zkClient != null) {
+        if (zkClient != null)
           zkClient.close()
-          zkClient = null
-        }
+
+        offsetFetchChannelLock synchronized shutdownOffsetFetchChannel()
+        offsetCommitProducerLock synchronized shutdownOffsetCommitProducer()
       } catch {
         case e =>
           fatal("error during consumer connector shutdown", e)
@@ -243,24 +252,71 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
-  def commitOffsets() {
-    if (zkClient == null) {
-      error("zk client is null. Cannot commit offsets")
-      return
+  private def createOffsetFetchChannel() : BlockingChannel = {
+    var channel: BlockingChannel = null
+
+    Random.shuffle(getAllBrokersInCluster(zkClient))
+      .toStream
+      .takeWhile { broker =>
+      try {
+        /* Pick the i'th broker from the shuffled list and attempt to establish a channel to it. If the channel connects
+         * well, break the loop or else keep trying further until all the brokers are attempted. */
+        trace("Establishing a channel with the broker at [host,port] = [" + broker.host + "," + broker.port + "]")
+        channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, 3000)
+        channel.connect()
+      } catch {
+        case e: Exception =>
+          channel = null
+          trace("Error while establishing offset fetch channel with " + broker, e)
+      }
+      (channel == null)
     }
-    for ((topic, infos) <- topicRegistry) {
-      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-      for (info <- infos.values) {
-        val newOffset = info.getConsumeOffset
-        try {
-          updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId,
-            newOffset.toString)
-        } catch {
-          case t: Throwable =>
-          // log it and let it go
-            warn("exception during commitOffsets",  t)
-        }
-        debug("Committed offset " + newOffset + " for topic " + info)
+
+    if(channel == null)
+      throw new KafkaException("Unable to establish a channel with any of the live brokers.")
+    channel
+  }
+
+  private def shutdownOffsetFetchChannel() {
+    if(offsetFetchChannel != null && offsetFetchChannel.isConnected)
+      offsetFetchChannel.disconnect()
+
+    offsetFetchChannel = null
+  }
+
+  private def createOffsetCommitProducer() : Producer[String, String] = {
+    val props = new Properties()
+    props.put("request.required.acks", "-1")
+    props.put("serializer.class", classOf[StringEncoder].getCanonicalName)
+    props.put("key.serializer.class", classOf[StringEncoder].getCanonicalName)
+    props.put("metadata.broker.list", getAllBrokersInCluster(zkClient).map(broker => broker.host + ":" + broker.port).mkString(","))
+    new Producer[String, String](new ProducerConfig(props))
+  }
+
+  private def shutdownOffsetCommitProducer() {
+    if (offsetCommitProducer != null) {
+      offsetCommitProducer.close()
+      offsetCommitProducer = null
+    }
+  }
+
+  def commitOffsets() {
+    offsetCommitProducerLock synchronized {
+      if (offsetCommitProducer == null)
+        offsetCommitProducer = createOffsetCommitProducer()
+
+      try {
+        debug("Sending offset commit request.")
+        offsetCommitProducer.send(topicRegistry.flatMap(t => t._2.values.map( partitionInfo =>
+          KeyedMessage[String, String](OffsetManager.OffsetsTopicName,
+                                       OffsetManager.offsetCommitKey(config.groupId, t._1, partitionInfo.partitionId),
+                                       config.groupId,
+                                       OffsetManager.offsetCommitValue(partitionInfo.getConsumeOffset()))
+        )).toSeq: _*)
+      } catch {
+        case t: Throwable =>       // log it and let it go
+          shutdownOffsetCommitProducer()
+          warn("Exception during commitOffsets", t)
       }
     }
   }
@@ -579,15 +635,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                       topicDirs: ZKGroupTopicDirs, partition: Int,
                                       topic: String, consumerThreadId: String) {
       val partTopicInfoMap = currentTopicRegistry.get(topic)
+      val currTopicPartition = TopicAndPartition(topic, partition)
+      val gtp = GroupTopicPartition(config.groupId, currTopicPartition)
 
-      val znode = topicDirs.consumerOffsetDir + "/" + partition
-      val offsetString = readDataMaybeNull(zkClient, znode)._1
-      // If first time starting a consumer, set the initial offset to -1
-      val offset =
-        offsetString match {
-          case Some(offsetStr) => offsetStr.toLong
-          case None => PartitionTopicInfo.InvalidOffset
+      val response = offsetFetchChannelLock synchronized {
+        try {
+          debug("Sending offset fetch request for %s".format(gtp))
+          if (offsetFetchChannel == null || !offsetFetchChannel.isConnected)
+            createOffsetFetchChannel()
+
+          offsetFetchChannel.send(OffsetFetchRequest(config.groupId, Seq(currTopicPartition), OffsetFetchRequest.CurrentVersion, 0, "consumer-" + consumerIdString))
+          offsetFetchChannel.receive()
+        } catch {
+          case t: Throwable =>
+            warn("Error in offset fetch request on " + gtp, t)
+            shutdownOffsetFetchChannel()
+            throw t
         }
+      }
+
+      val offsetFetchResponse = OffsetFetchResponse.readFrom(response.buffer)
+
+      /* If the offset is being loaded at the broker end, fail this offset fetch request. For rest errors, the offset would be -1 */
+      if(offsetFetchResponse.requestInfo.count(_._2.error == ErrorMapping.OffsetsLoadingCode) > 0)
+        throw new InvalidOffsetException("Offset fetch request could not fetch all requested offsets as some offsets are being loaded.")
+
+      val topicPartitionAndOffsetMetadataMap = offsetFetchResponse.requestInfoGroupedByTopic(topic)
+      val offset = topicPartitionAndOffsetMetadataMap(currTopicPartition).offset
+      debug("Response for offset fetch on %s is %d".format(gtp,offset))
+
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 37ddd55..8c6d817 100644
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -20,8 +20,7 @@ package kafka.producer
 
 import kafka.utils._
 
-private class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
-  private val random = new java.util.Random
+private[kafka] class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
   
   def partition(key: Any, numPartitions: Int): Int = {
     Utils.abs(key.hashCode) % numPartitions
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0ec031a..ad257ba 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,24 +17,31 @@
 
 package kafka.server
 
-import kafka.admin.AdminUtils
+import kafka.admin.{AdminOperationException, AdminUtils}
 import kafka.api._
 import kafka.message._
 import kafka.network._
 import kafka.log._
-import kafka.utils.ZKGroupTopicDirs
-import org.apache.log4j.Logger
 import scala.collection._
-import java.util.Properties
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
 import kafka.metrics.KafkaMetricsGroup
 import org.I0Itec.zkclient.ZkClient
 import kafka.common._
-import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
-import kafka.network.RequestChannel.Response
+import kafka.utils._
 import kafka.cluster.Broker
 import kafka.controller.KafkaController
+import scala.Predef._
+import scala.collection.Map
+import scala.collection.Set
+import scala.None
+import java.util.Properties
+import kafka.api.PartitionOffsetsResponse
+import scala.Some
+import kafka.api.ProducerResponseStatus
+import kafka.common.TopicAndPartition
+import kafka.network.RequestChannel.Response
+import kafka.api.PartitionFetchInfo
 
 
 /**
@@ -45,6 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val zkClient: ZkClient,
                 val brokerId: Int,
                 val config: KafkaConfig,
+                val offsetManager: OffsetManager,
                 val controller: KafkaController) extends Logging {
 
   private val producerRequestPurgatory =
@@ -59,6 +67,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 //  private var allBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
   private var aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
   private val partitionMetadataLock = new Object
+  private val offsetFetchChannelPool: Pool[Int, BlockingChannel] = new Pool[Int, BlockingChannel]
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
   /**
@@ -91,7 +100,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
     val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
     try {
-      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
       val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
       requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
     } catch {
@@ -161,6 +170,21 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   /**
+   * Checks if the current topic is the offsets topic and commits those offsets via the offset manager
+   */
+  def commitIfOffsetsTopic(produceRequest: ProducerRequest, statuses: Map[TopicAndPartition, ProducerResponseStatus]) {
+    if (produceRequest.data.keySet.exists(_.topic == OffsetManager.OffsetsTopicName)) {
+      produceRequest.data
+        .filter (t => statuses(t._1).error == ErrorMapping.NoError)
+        .foreach(_._2.map { m =>
+          val parsedMessage = new OffsetManager.KeyValue(m.message)
+          offsetManager.putOffset(parsedMessage.key, parsedMessage.value)
+        })
+      produceRequest.emptyData()
+    }
+  }
+
+  /**
    * Handle a produce request
    */
   def handleProducerRequest(request: RequestChannel.Request) {
@@ -177,6 +201,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       !produceRequest.data.keySet.exists(
         m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
     if(produceRequest.requiredAcks == 0) {
+      // Populate statuses to check for problems while appending to log.
+      // For offsets topic, offsets are added by offset manager only if there were no errors.
+      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
+      commitIfOffsetsTopic(produceRequest, statuses)
+
       // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer
       // and is tuned for very high throughput
       requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null))
@@ -186,6 +215,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         numPartitionsInError == produceRequest.numPartitions) {
       val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
       val response = ProducerResponse(produceRequest.correlationId, statuses)
+      commitIfOffsetsTopic(produceRequest, statuses)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     } else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
@@ -211,8 +241,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       debug(satisfiedProduceRequests.size +
         " producer requests unblocked during produce to local log.")
       satisfiedProduceRequests.foreach(_.respond())
-      // we do not need the data anymore
-      produceRequest.emptyData()
+      // we do not need the data anymore (except if its the offsets topic wherein the data is needed to commit offsets)
+      if(!produceRequest.data.keySet.exists(_.topic == OffsetManager.OffsetsTopicName))
+        produceRequest.emptyData()
     }
   }
   
@@ -374,7 +405,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Read from a single topic/partition at the given offset upto maxSize bytes
    */
-  private def readMessageSet(topic: String, 
+  private def readMessageSet(topic: String,
                              partition: Int, 
                              offset: Long,
                              maxSize: Int, 
@@ -563,17 +594,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       topicMetadata.errorCode match {
         case ErrorMapping.NoError => topicsMetadata += topicMetadata
         case ErrorMapping.UnknownTopicOrPartitionCode =>
-          if (config.autoCreateTopicsEnable) {
-            try {
-              AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+          try {
+            if (config.autoCreateTopicsEnable || topicMetadata.topic == OffsetManager.OffsetsTopicName) {
+              val (numPartitions, replicationFactor, properties) =
+                if (topicMetadata.topic == OffsetManager.OffsetsTopicName)
+                  (config.offsetTopicPartitions, config.offsetTopicReplicationFactor, OffsetManager.OffsetsTopicProperties)
+                else
+                  (config.numPartitions, config.defaultReplicationFactor, new Properties)
+              AdminUtils.createTopic(zkClient, topicMetadata.topic, numPartitions, replicationFactor, properties)
               info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
-            } catch {
-              case e: TopicExistsException => // let it go, possibly another broker created this topic
+                .format(topicMetadata.topic, numPartitions, replicationFactor))
+              topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
             }
-            topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
-          } else {
-            topicsMetadata += topicMetadata
+            else
+              topicsMetadata += topicMetadata
+          } catch {
+            case e: Exception =>
+              topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
           }
         case _ =>
           debug("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,
@@ -590,20 +627,23 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Service the Offset commit API
    */
   def handleOffsetCommitRequest(request: RequestChannel.Request) {
+    if(config.offsetStorage.toLowerCase == "kafka")
+      throw new KafkaException("OffsetCommitRequests' are currently not supported for inbuilt-offset manager. Use an embedded producer to commit offsets.")
+
     val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
-    val responseInfo = offsetCommitRequest.requestInfo.map{
-      case (topicAndPartition, metaAndError) => {
-        val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
+    val responseInfo = offsetCommitRequest.requestInfo.map {
+      case (topicPartition, metaAndError) => {
         try {
           if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
-            (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+            (topicPartition, ErrorMapping.OffsetMetadataTooLargeCode)
           } else {
-            ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
-              topicAndPartition.partition, metaAndError.offset.toString)
-            (topicAndPartition, ErrorMapping.NoError)
+            val key = new GroupTopicPartition(offsetCommitRequest.groupId, topicPartition)
+            val offset = OffsetAndMetadata(metaAndError.offset, metaAndError.metadata)
+            offsetManager.putOffset(key, offset)
+            (topicPartition, ErrorMapping.NoError)
           }
         } catch {
-          case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+          case e: Throwable => (topicPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
         }
       }
     }
@@ -617,30 +657,64 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
-    val responseInfo = offsetFetchRequest.requestInfo.map( t => {
-      val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
-      try {
-        val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
-        payloadOpt match {
-          case Some(payload) => {
-            (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError))
-          } 
-          case None => (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
-                          ErrorMapping.UnknownTopicOrPartitionCode))
+    val offsetsPartition = OffsetManager.partitionFor(offsetFetchRequest.groupId)
+
+    val responses =
+      if(leaderCache.contains(TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition))) {
+        val leaderId = leaderCache(TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader
+        if(leaderId == brokerId) {    // Current broker is the leader for this partition of the offsets topic
+          offsetFetchRequest.requestInfo.map(topicPartition =>
+            (topicPartition, offsetManager.getOffset(new GroupTopicPartition(offsetFetchRequest.groupId, topicPartition))))
         }
-      } catch {
-        case e => 
-          (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
-             ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
+        else {      // Some other broker is the leader for the required partition of the offsets topic and we need to forward the request
+          try {
+            if(!offsetFetchChannelPool.contains(leaderId) || !offsetFetchChannelPool.get(leaderId).isConnected) {
+              val broker = aliveBrokers(leaderId)
+              val newChannel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, 30000)
+              newChannel.connect()
+              offsetFetchChannelPool.put(leaderId, newChannel)
+            }
+
+            val offsetFetchChannel = offsetFetchChannelPool.get(leaderId)
+            val redirectedRequest = offsetFetchRequest.copy(clientId = "broker-" + brokerId.toString)
+            offsetFetchChannel.send(redirectedRequest)
+
+            val response = offsetFetchChannel.receive()
+            val offsetFetchResponse = OffsetFetchResponse.readFrom(response.buffer)
+            offsetFetchResponse.requestInfo.toSeq
+          } catch {
+            case t:Throwable =>
+              offsetFetchRequest.requestInfo.map(topicPartition => (topicPartition, OffsetMetadataAndError.UnknownTopicPartition))
+          }
+        }
+      } else {
+        // We don't know who is the leader for this partition of "offsets topic". Try to create the offsets topic
+        debug("Could not find leader for the partition %d of offsets topic.".format(offsetsPartition))
+
+        if(!leaderCache.contains(TopicAndPartition(OffsetManager.OffsetsTopicName, 0))) {
+          try {
+            debug("Auto-creating the offsets topic.")
+            AdminUtils.createTopic(zkClient, OffsetManager.OffsetsTopicName, config.offsetTopicPartitions,
+              config.offsetTopicReplicationFactor, OffsetManager.OffsetsTopicProperties)
+            // Return offset = -1 in response as we have just created the offsets topic and there won't be anything consumed w/o offsets topic
+            offsetFetchRequest.requestInfo.map(topicPartition => (topicPartition, OffsetMetadataAndError.NoError))
+          } catch {
+            case aoe: AdminOperationException =>   // would happen if the topic auto-creation above fails
+              offsetFetchRequest.requestInfo.map(topicPartition => (topicPartition, OffsetMetadataAndError.UnknownTopicPartition))
+          }
+        } else
+          offsetFetchRequest.requestInfo.map(topicPartition => (topicPartition, OffsetMetadataAndError.UnknownTopicPartition))
       }
-    })
-    val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), 
-                                           offsetFetchRequest.correlationId)
+
+    val response = new OffsetFetchResponse(responses.toMap, offsetFetchRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
   def close() {
     debug("Shutting down.")
+    if (offsetFetchChannelPool != null)
+      offsetFetchChannelPool.values.foreach(channel => Utils.swallow(channel.disconnect()))
+
     fetchRequestPurgatory.shutdown()
     producerRequestPurgatory.shutdown()
     debug("Shut down complete.")
@@ -742,9 +816,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           val pstat = partitionStatus(new RequestKey(status._1))
           (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
         })
-      
-      val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets)
 
+      commitIfOffsetsTopic(produce, finalErrorsAndOffsets)
+
+      val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets)
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))
     }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ebbbdea..1a339c6 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -223,4 +223,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   
   /* the maximum size for a metadata entry associated with an offset commit */
   val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", 1024)
+
+  /*********** Consumer offset management configuration ***********/
+
+  /* indicates if consumer offset storage is done inside kafka itself or Zookeeper */
+  val offsetStorage = props.getString("offset.storage", "kafka")
+
+  /* Kafkas' inbuilt offset management creates a special topic (See OffsetManager.OffsetsTopicName)
+     which all the consumers write offsets to. These parameters are for this special offsets topic. */
+  val offsetTopicReplicationFactor: Int = props.getInt("offsets.topic.replication.factor", 1)
+  val offsetTopicPartitions: Int = props.getInt("offsets.topic.num.partitions", 1)
+
+  /* We recommend keeping the offsets topic segment bytes low and enabling the log cleaner to
+     facilitate faster offset loads and reduce the on-disk footprint. Default value is 10 MB */
+  val offsetTopicSegmentBytes: Int = props.getInt("offsets.topic.segment.bytes", 10*1024*1024)
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a925ae1..67d0e14 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -52,10 +52,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   var kafkaController: KafkaController = null
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
   var zkClient: ZkClient = null
+  var offsetManager: OffsetManager = null
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
-   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
+   * Instantiates the LogManager, the OffsetManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
     info("starting")
@@ -84,9 +85,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
     kafkaController = new KafkaController(config, zkClient)
-    
+    offsetManager = OffsetManager.getOffsetManager(config)
+
     /* start processing requests */
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
+    apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, offsetManager, kafkaController)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
    
     Mx4jLoader.maybeLoad()
@@ -102,8 +104,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, zkClient)
     kafkaHealthcheck.startup()
 
-    
     registerStats()
+    offsetManager.startup(zkClient, logManager)
     startupComplete.set(true);
     info("started")
   }
@@ -225,6 +227,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       Utils.swallow(kafkaScheduler.shutdown())
       if(apis != null)
         Utils.swallow(apis.close())
+      if(offsetManager != null)
+        Utils.swallow(offsetManager.shutdown())
       if(replicaManager != null)
         Utils.swallow(replicaManager.shutdown())
       if(logManager != null)
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
new file mode 100644
index 0000000..3519cc1
--- /dev/null
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import kafka.log.{LogConfig, LogManager}
+import kafka.common._
+import kafka.serializer.StringDecoder
+import java.util.concurrent.ConcurrentSkipListSet
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
+import kafka.message.Message
+import scala.collection._
+import java.nio.ByteBuffer
+import kafka.common.TopicAndPartition
+import kafka.common.OffsetMetadataAndError
+import kafka.message.MessageAndOffset
+import scala.Some
+import kafka.producer.DefaultPartitioner
+import java.util.Properties
+
+/**
+ * Main interface for offset management which provides operations for storing, accessing, loading and
+ * maintaining consumer offsets
+ */
+trait OffsetManager extends Logging {
+  protected var zkClient: ZkClient = null
+
+  /**
+   * Initializes the offset manager so that its ready to serve offsets
+   *
+   * @param zkClient A pre-connected zookeeper client
+   * @param logManager to read messages from logs while loading the offsets
+   */
+  def startup(zkClient: ZkClient, logManager: LogManager = null)
+
+  /**
+   * Fetch the requested offset value from the underlying offsets storage
+   *
+   * @param key The requested group-topic-partition
+   * @return if the offset is present, return it or else return None
+   */
+  def getOffset(key: GroupTopicPartition): OffsetMetadataAndError
+
+  /**
+   * Store the given key-offset value in the underlying offsets storage
+   *
+   * @param key The group-topic-partition
+   * @param offset The offset to be stored
+   */
+  def putOffset(key: GroupTopicPartition, offset: OffsetAndMetadata)
+
+  /**
+   * Asynchronously load the offsets from logs into the offset manager
+   *
+   * @param partition The partition of the offsets topic which must be loaded
+   */
+  def triggerLoadOffsets(partition: Int)
+
+  /**
+   * Does cleanup and shutdown of any structures maintained by the offset manager
+   */
+  def shutdown()
+}
+
+object OffsetManager {
+  val OffsetsTopicName = "__consumer__offsets"
+  val Delimiter = "##"
+  val OffsetsTopicProperties = new Properties()
+
+  private val Decoder = new StringDecoder
+  private var Manager: OffsetManager = null
+  private var OffsetTopicPartitions = 1
+  private val Partitioner = new DefaultPartitioner
+  private val lock = new Object
+
+  /**
+   * Creates the (singleton) offset manager.
+   *
+   * @param config Used to create a offset manager object based on "offset.storage" property
+   * @return an offset manager
+   */
+  private def createOffsetManager(config: KafkaConfig) {
+    lock.synchronized {
+      OffsetTopicPartitions = config.offsetTopicPartitions
+      OffsetsTopicProperties.put(LogConfig.SegmentBytesProp, config.offsetTopicSegmentBytes.toString)
+      OffsetsTopicProperties.put(LogConfig.CleanupPolicyProp, "dedupe")
+
+      config.offsetStorage.toLowerCase match {
+        case "kafka"     => Manager = new DefaultOffsetManager
+        case "zookeeper" => Manager = new ZookeeperOffsetManager
+        case _  => throw new InvalidConfigException(("Invalid value of \"offset.storage\" : %s (specify one of " +
+                                                     "\"kafka\" or \"zookeeper\")").format(config.offsetStorage))
+      }
+    }
+  }
+
+  /**
+    * Implements singleton pattern to allow only one instance of the offset manager in every context.
+    *
+    * @param config Used to create a offset manager object if not present
+    * @return an offset manager object associated with the current context
+    */
+  def getOffsetManager(config: KafkaConfig) : OffsetManager = {
+    if(Manager == null)
+      createOffsetManager(config)
+    Manager
+  }
+
+  /**
+   * Gives the offsets' topic partition responsible for the given consumer group id
+   *
+   * @param group consumer group id
+   * @return offsets' topic partition
+   */
+  def partitionFor(group: String): Int = Partitioner.partition(group, OffsetTopicPartitions)
+
+  /**
+   * Generates the key for offset commit message for given (group, topic, partition)
+   *
+   * @return key for offset commit message
+   */
+  def offsetCommitKey(group: String, topic: String, partition: Int): String =
+    group + Delimiter + topic + Delimiter + partition
+
+  /**
+   * Generates the payload for offset commit message from given offset and metadata
+   *
+   * @param offset input offset
+   * @param metadata metadata
+   * @return payload for offset commit message
+   */
+  def offsetCommitValue(offset: Long, metadata: String = OffsetAndMetadata.NoMetadata): String =
+    offset.toString + Delimiter + metadata + Delimiter + SystemTime.milliseconds.toString
+
+  /**
+   * Decodes the input byte-buffer using a string decoder and splits it using the delimiter for offset messages
+   *
+   * @param text input byte-buffer
+   * @return an array of decoded strings
+   */
+  private def decode(text: ByteBuffer) : Array[String] =
+    OffsetManager.Decoder.fromBytes(Utils.readBytes(text)).split(Delimiter)
+
+  /**
+   * A convenience case class for key-value maintained by offset manager
+   * Provides secondary constructor which is useful to extract and decode the key-value pair from a kakfa message
+   */
+  case class KeyValue(key: GroupTopicPartition, value: OffsetAndMetadata) {
+    def this(message: Message) = this(new GroupTopicPartition(decode(message.key)), new OffsetAndMetadata(decode(message.payload)))
+    override def toString = "[%s-%s]".format(key.toString, value.toString)
+    def asTuple = (key, value)
+  }
+}
+
+/**
+ * An inbuilt table based implementation of the offset manager trait
+ */
+private class DefaultOffsetManager extends OffsetManager with KafkaMetricsGroup {
+
+  /* This in-memory hash table would store the consumer offsets */
+  private val offsetsTable = new Pool[GroupTopicPartition, OffsetAndMetadata]
+
+  /* A set of partitions of offsets' topic which the offset manager is currently loading */
+  private val loading = new ConcurrentSkipListSet[Int]
+
+  /* Used for accessing kafka logs during loading */
+  private var logManager: LogManager = null
+
+  /* Thread-pool for performing the asynchronous loading. If a broker, leading 'X' partitions of the offsets' topic, fails
+   * then the ownership of those partitions would be distributed across the remaining brokers. The #partitions to be
+   * loaded per broker would be less so we are creating a thread pool of just 5 threads */
+  private val scheduler = new KafkaScheduler(5, "offsets-table-loader-")
+
+  /* While reading messages from disk, read 10 MB at a time */
+  private val readBatchSize = 10*1024*1024
+
+  /* All the commits which happen while loading is in progress go here to avoid loaded offsets from replacing the newer ones */
+  private val commitsWhileLoading = mutable.Set[GroupTopicPartition]()
+
+  /* Lock used to prevent loading process from replacing new offsets being committed */
+  private val loadLock = new Object
+
+  newGauge("DefaultOffsetManager-NumOffsets",
+    new Gauge[Int] {
+      def value = offsetsTable.size
+  })
+
+  newGauge("DefaultOffsetManager-NumConsumerGroups",
+    new Gauge[Int] {
+      def value = offsetsTable.keys.toSeq.map(_.group).toSet.size
+  })
+
+  def startup(zkClient: ZkClient, logManager: LogManager) {
+    info("Starting default offset manager")
+    this.zkClient = zkClient
+    this.logManager = logManager
+    try {
+      scheduler.startup()
+    } catch {
+      case e: IllegalStateException =>
+        warn("Scheduler was already running.")
+    }
+  }
+
+  /**
+   * Checks if the requested offset partition is currently being loaded and if so would return a LoadingOffset code
+   * else would return the offset value from the offset table. Returns "None" if no value is found in the offset table.
+   */
+  def getOffset(key: GroupTopicPartition): OffsetMetadataAndError = {
+    val offsetMetadataError = if(loading.contains(OffsetManager.partitionFor(key.group)))
+      OffsetMetadataAndError.OffsetLoading
+    else {
+      val offsetAndMetadata = offsetsTable.get(key)
+      if (offsetAndMetadata == null)
+        OffsetMetadataAndError.UnknownTopicPartition
+      else
+        OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)
+    }
+
+    debug("Fetched offset %s for key %s".format(offsetMetadataError, key))
+    offsetMetadataError
+  }
+
+  /**
+   * Adds the key-offset pair to the offsets table. If the requested offset partition is currently
+   * being loaded then it would be added to 'commitsWhileLoading' so that loading won't override it.
+   */
+  def putOffset(key: GroupTopicPartition, offset: OffsetAndMetadata) = {
+    if(loading.contains(OffsetManager.partitionFor(key.group)))
+      loadLock synchronized commitsWhileLoading.add(key)
+
+    offsetsTable.put(key,offset)
+    debug("Added offset %s for key %s".format(offset, key))
+  }
+
+  /**
+   * Asynchronously read the messages for offsets topic from the logs and populate the hash table of offsets.
+   */
+  def triggerLoadOffsets(offsetsPartition: Int) {
+    loading.add(offsetsPartition)   // prevent any offset fetch directed to this partition of the offsets topic
+    scheduler.schedule("[%s,%d]".format(OffsetManager.OffsetsTopicName, offsetsPartition), loadOffsets)
+
+    def loadOffsets() {
+      val startTime = SystemTime.milliseconds
+      val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+
+      try {
+        logManager.getLog(topicPartition) match {
+          case Some(log) =>
+            var currOffset = log.logSegments.head.baseOffset
+
+            while(currOffset < log.logEndOffset) {
+              log.read(currOffset, readBatchSize).iterator.foreach {
+                case m: MessageAndOffset =>
+                  val record = new OffsetManager.KeyValue(m.message)
+                  loadLock synchronized {
+                    if(!commitsWhileLoading.contains(record.key))
+                      offsetsTable.put(record.key, record.value)
+                  }
+                  currOffset = currOffset + 1
+              }
+            }
+
+            loading.remove(offsetsPartition)  // resume offset fetch directed to this partition of the offsets topic
+            loadLock synchronized {           // remove the offsets committed for this offsets partition while loading was in progress
+              commitsWhileLoading.retain(gtp => OffsetManager.partitionFor(gtp.group) != offsetsPartition)
+            }
+            info("Loading offsets for %s completed in %d ms".format(topicPartition, SystemTime.milliseconds - startTime))
+          case None =>
+            loading.remove(offsetsPartition)  // unlock to prevent from blocking offset fetch requests
+            warn("No log found for " + topicPartition)
+        }
+      } catch {
+        case t: Throwable =>
+          error("Error in loading offsets from " + topicPartition, t)
+          loading.remove(offsetsPartition)  // unlock to prevent from blocking offset fetch requests
+      }
+    }
+  }
+
+  def shutdown() {
+    info("Shutting down.")
+    try {
+      scheduler.shutdown()
+    } catch {
+      case e: Exception =>                       // indicates that scheduler was already shutdown during previous attempt
+        debug("Scheduler was already shutdown.") // to shutdown offset manager. Log the exception and let it go
+    }
+  }
+}
+
+/**
+ * A Zookeeper based implementation of the offset manager trait
+ */
+private class ZookeeperOffsetManager extends OffsetManager {
+  private def getPartitionDir(key: GroupTopicPartition): String =
+    new ZKGroupTopicDirs(key.group, key.topicPartition.topic).consumerOffsetDir + "/" + key.topicPartition.partition
+
+  def startup(zkClient: ZkClient, logManager: LogManager) = this.zkClient = zkClient
+
+  def getOffset(key: GroupTopicPartition): OffsetMetadataAndError = {
+    val offsetMetadataError = ZkUtils.readDataMaybeNull(zkClient, getPartitionDir(key))._1 match {
+      case Some(value) =>
+        val splits = value.split(OffsetManager.Delimiter)
+        new OffsetMetadataAndError(splits(0).toLong, splits(1))
+      case None =>
+        OffsetMetadataAndError.UnknownTopicPartition
+    }
+    debug("Fetched offset %d for key %s".format(offsetMetadataError.offset, key))
+    offsetMetadataError
+  }
+
+  def putOffset(key: GroupTopicPartition, value: OffsetAndMetadata) = {
+    ZkUtils.updatePersistentPath(zkClient, getPartitionDir(key), value.offset.toString + OffsetManager.Delimiter + value.metadata)
+    debug("Added offset %s for key %s".format(value, key))
+  }
+
+  def triggerLoadOffsets(offsetsPartition: Int) {}
+  def shutdown() { info("shutting down") }
+}
+
+/**
+ * A case class for (group, topic, partition) which is used as key by offset manager
+ */
+case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
+  def this(group: String, topic: String, partition: Int) = this(group, new TopicAndPartition(topic, partition))
+  def this(entry: Array[String]) = this(entry(0), entry(1), entry(2).toInt)
+  override def toString = "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
+}
+
+/**
+ * A case class for (offset, metadata) which is used as value by offset manager
+ */
+case class OffsetAndMetadata(offset: Long, metadata: String = OffsetAndMetadata.NoMetadata) {
+  def this(entry: Array[String]) = this(entry(0).toLong, entry(1))
+  override def toString = "OffsetAndMetadata[%d,%s]".format(offset, metadata)
+}
+
+object OffsetAndMetadata {
+  val InvalidOffset: Long = -1L
+  val NoMetadata: String = ""
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 73c87c6..dd41cff 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -197,7 +197,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
     leaderAndISRRequest.partitionStateInfos.foreach(p =>
       stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d received from controller %d epoch %d for partition [%s,%d]"
                                 .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
@@ -220,7 +220,7 @@ class ReplicaManager(val config: KafkaConfig,
         val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
         try {
           if(requestedLeaderId == config.brokerId)
-            makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId)
+            makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId, offsetManager)
           else
             makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.aliveLeaders,
                          leaderAndISRRequest.correlationId)
@@ -250,17 +250,21 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   private def makeLeader(controllerId: Int, epoch:Int, topic: String, partitionId: Int,
-                         partitionStateInfo: PartitionStateInfo, correlationId: Int) = {
+                         partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager) = {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
                              "starting the become-leader transition for partition [%s,%d]")
                                .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
     if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, correlationId)) {
+
+      // if this broker is made leader for some partition of offsets' topic, then offset manager must load it
+      if (topic == OffsetManager.OffsetsTopicName && !leaderPartitions.contains(partition))
+        offsetManager.triggerLoadOffsets(partitionId)
+
       // also add this partition to the list of partitions for which the leader is the current broker
-      leaderPartitionsLock synchronized {
+      leaderPartitionsLock synchronized
         leaderPartitions += partition
-      } 
     }
     stateChangeLogger.trace("Broker %d completed become-leader transition for partition [%s,%d]".format(localBrokerId, topic, partitionId))
   }
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index bc415e3..c8bfe51 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -26,6 +26,7 @@ import kafka.cluster.Broker
 import collection.mutable._
 import kafka.common.{TopicAndPartition, ErrorMapping, OffsetMetadataAndError}
 import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.server.OffsetAndMetadata
 
 
 object SerializationTestUtils{
@@ -149,7 +150,7 @@ object SerializationTestUtils{
   def createTestOffsetCommitRequest: OffsetCommitRequest = {
     new OffsetCommitRequest("group 1", collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="some metadata"),
-      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(offset=100L, metadata=OffsetMetadataAndError.NoMetadata)
+      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(offset=100L, metadata=OffsetAndMetadata.NoMetadata)
     ))
   }
 
@@ -170,7 +171,7 @@ object SerializationTestUtils{
   def createTestOffsetFetchResponse: OffsetFetchResponse = {
     new OffsetFetchResponse(collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", ErrorMapping.NoError),
-      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadataAndError.NoMetadata,
+      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetAndMetadata.NoMetadata,
         ErrorMapping.UnknownTopicOrPartitionCode)
     ))
   }
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index c0475d0..6a7943e 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -23,7 +23,6 @@ import junit.framework.Assert._
 import java.util.Properties
 import kafka.consumer.SimpleConsumer
 import org.junit.{After, Before, Test}
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import kafka.api.{OffsetCommitRequest, OffsetFetchRequest}
@@ -145,8 +144,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset)
     assertEquals(44L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.offset)
     assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset)
-    assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
-    assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
+    assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
+    assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index bab436d..75002ba 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -19,14 +19,13 @@ package kafka.server
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel
 import kafka.utils.{ZkUtils, Time, TestUtils, MockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
 import kafka.api._
 import scala.Some
-import org.junit.Assert._
 import kafka.common.TopicAndPartition
 
 
@@ -92,7 +91,8 @@ class SimpleFetchTest extends JUnit3Suite {
     // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
     // don't provide replica or leader callbacks since they will not be tested here
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, controller)
+    val offsetManager = EasyMock.createMock(classOf[kafka.server.DefaultOffsetManager])
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, offsetManager, controller)
 
     // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -161,7 +161,8 @@ class SimpleFetchTest extends JUnit3Suite {
     val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
 
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, controller)
+    val offsetManager = EasyMock.createMock(classOf[kafka.server.DefaultOffsetManager])
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, offsetManager, controller)
 
     /**
      * This fetch, coming from a replica, requests all data at offset "15".  Because the request is coming
-- 
1.7.1

