From 50c870849ebbaf6729986e907da7cab297ed691c Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 14 Jan 2013 18:01:05 -0800
Subject: [PATCH] 	modified:   core/src/main/scala/kafka/api/ProducerRequest.scala
 	modified:   core/src/main/scala/kafka/server/KafkaApis.scala

---
 .../src/main/scala/kafka/api/ProducerRequest.scala |    9 ++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   33 ++++++++++++-------
 2 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index ffa96a6..e54fcbc 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -56,13 +56,13 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
-                           data: Map[TopicAndPartition, ByteBufferMessageSet])
+                           var data: Map[TopicAndPartition, ByteBufferMessageSet])
     extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
    * Partitions the data into a map of maps (one for each topic).
    */
-  private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
+  private var dataGroupedByTopic = data.groupBy(_._1.topic)
 
   def this(correlationId: Int,
            clientId: String,
@@ -120,5 +120,10 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
 
   def numPartitions = data.size
 
+  def resetData(){
+    data = null
+    dataGroupedByTopic = null
+  }
+
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 60752fb..a0352de 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -156,8 +156,8 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
    */
-  def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messages: MessageSet) {
-    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), messages)
+  def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) {
+    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes)
     trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
 
     // send any newly unblocked responses
@@ -182,8 +182,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
     val numPartitionsInError = localProduceResults.count(_.error.isDefined)
+
     produceRequest.data.foreach(partitionAndData =>
-      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2))
+      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes))
 
     val allPartitionHaveReplicationFactorOne =
       !produceRequest.data.keySet.exists(
@@ -201,10 +202,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       val producerRequestKeys = produceRequest.data.keys.map(
         topicAndPartition => new RequestKey(topicAndPartition)).toSeq
       val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap
+      val messageSizes = produceRequest.data.map(r => r._1 -> r._2.sizeInBytes).toMap
+
       val delayedProduce = new DelayedProduce(producerRequestKeys, 
                                               request,
                                               statuses,
-                                              produceRequest, 
+                                              produceRequest.requiredAcks,
+                                              produceRequest.correlationId,
+                                              messageSizes,
                                               produceRequest.ackTimeoutMs.toLong)
       producerRequestPurgatory.watch(delayedProduce)
 
@@ -220,6 +225,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       debug(satisfiedProduceRequests.size +
         " producer requests unblocked during produce to local log.")
       satisfiedProduceRequests.foreach(_.respond())
+      // we do not need the data anymore
+      produceRequest.resetData()
     }
   }
   
@@ -508,14 +515,14 @@ class KafkaApis(val requestChannel: RequestChannel,
    * A holding pen for fetch requests waiting to be satisfied
    */
   class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
-          extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) {
+          extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) {
     this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
-    def checkSatisfied(messages: MessageSet, delayedFetch: DelayedFetch): Boolean = {
-      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messages.sizeInBytes)
+    def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = {
+      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
 
@@ -543,7 +550,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   class DelayedProduce(keys: Seq[RequestKey],
                        request: RequestChannel.Request,
                        initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus],
-                       val produce: ProducerRequest,
+                       requiredAcks: Short,
+                       correlationId: Int,
+                       messageSizes: Map[TopicAndPartition, Int],
                        delayMs: Long)
           extends DelayedRequest(keys, request, delayMs) with Logging {
 
@@ -576,7 +585,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
         })
       
-      val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets)
+      val response = ProducerResponse(correlationId, finalErrorsAndOffsets)
 
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))
@@ -604,7 +613,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val partitionOpt = replicaManager.getPartition(topic, partitionId)
         val (hasEnough, errorCode) = partitionOpt match {
           case Some(partition) =>
-            partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks)
+            partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, requiredAcks)
           case None =>
             (false, ErrorMapping.UnknownTopicOrPartitionCode)
         }
@@ -616,8 +625,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           fetchPartitionStatus.error = ErrorMapping.NoError
         }
         if (!fetchPartitionStatus.acksPending) {
-          val messages = produce.data(followerFetchRequestKey.topicAndPartition)
-          maybeUnblockDelayedFetchRequests(topic, partitionId, messages)
+          val messageSizeInBytes = messageSizes(followerFetchRequestKey.topicAndPartition)
+          maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes)
         }
       }
 
-- 
1.7.1

