From 123d874615f095a22e725892ed4c085e926eef46 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 28 Jan 2013 17:53:46 -0800
Subject: [PATCH] Reduce memory footprint

	modified:   bin/kafka-run-class.sh
	modified:   core/src/main/scala/kafka/api/ProducerRequest.scala
	modified:   core/src/main/scala/kafka/network/RequestChannel.scala
	modified:   core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
	modified:   core/src/main/scala/kafka/server/KafkaApis.scala
	modified:   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
	modified:   core/src/test/scala/unit/kafka/network/SocketServerTest.scala
	modified:   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
	modified:   system_test/replication_testsuite/config/server.properties
	modified:   system_test/replication_testsuite/replica_basic_test.py
	modified:   system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
---
 bin/kafka-run-class.sh                             |    2 +-
 .../src/main/scala/kafka/api/ProducerRequest.scala |   13 +++++++++----
 .../main/scala/kafka/network/RequestChannel.scala  |    5 +++--
 .../kafka/producer/async/DefaultEventHandler.scala |    8 ++++----
 core/src/main/scala/kafka/server/KafkaApis.scala   |   18 ++++++++++--------
 .../api/RequestResponseSerializationTest.scala     |    2 +-
 .../unit/kafka/network/SocketServerTest.scala      |    8 +++++---
 .../unit/kafka/producer/SyncProducerTest.scala     |    2 +-
 .../replication_testsuite/config/server.properties |    2 +-
 .../replication_testsuite/replica_basic_test.py    |   12 +++++++-----
 .../testcase_0001/testcase_0001_properties.json    |    9 ++++++---
 11 files changed, 48 insertions(+), 33 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index decba0e..c5c1973 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -67,7 +67,7 @@ if [ -z "$KAFKA_JMX_OPTS" ]; then
 fi
 
 if [ -z "$KAFKA_OPTS" ]; then
-  KAFKA_OPTS="-Xmx512M -server  -Dlog4j.configuration=file:$base_dir/config/log4j.properties"
+  KAFKA_OPTS="-Xms5g -Xmx5g -XX:NewSize=3g -XX:MaxNewSize=3g  -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:SurvivorRatio=128 -verbose:gc -XX:+PrintGCApplicationStoppedTime -XX:InitialTenuringThreshold=15 -XX:MaxTenuringThreshold=15 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:logs/gc.log -server  -Dlog4j.configuration=file:$base_dir/config/log4j.properties"
 fi
 
 if [  $JMX_PORT ]; then
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 72b2cba..8842ce9 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -49,7 +49,7 @@ object ProducerRequest {
       })
     })
 
-    ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*))
+    ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, collection.mutable.Map(partitionDataPairs:_*))
   }
 }
 
@@ -58,19 +58,20 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
-                           data: Map[TopicAndPartition, ByteBufferMessageSet])
+                           data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
     extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
    * Partitions the data into a map of maps (one for each topic).
    */
   private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
+  val topicPartitionMessageSizeMap = data.map(r => r._1 -> r._2.sizeInBytes).toMap
 
   def this(correlationId: Int,
            clientId: String,
            requiredAcks: Short,
            ackTimeoutMs: Int,
-           data: Map[TopicAndPartition, ByteBufferMessageSet]) =
+           data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 
   def writeTo(buffer: ByteBuffer) {
@@ -130,7 +131,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     producerRequest.append("; ClientId: " + clientId)
     producerRequest.append("; RequiredAcks: " + requiredAcks)
     producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
-    producerRequest.append("; TopicAndPartition: " + data.map(r => r._1 -> r._2.sizeInBytes).toMap.mkString(","))
+    producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.map(r => r._1 -> r._2).toMap.mkString(","))
     producerRequest.toString()
   }
 
@@ -142,5 +143,9 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
+
+  def emptyData(){
+    data.clear()
+  }
 }
 
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 5185dec..8b318d9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -31,7 +31,7 @@ object RequestChannel extends Logging {
   val AllDone = new Request(1, 2, getShutdownReceive(), 0)
 
   def getShutdownReceive() = {
-    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, ByteBufferMessageSet]())
+    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
     val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
     byteBuffer.putShort(RequestKeys.ProduceKey)
     emptyProducerRequest.writeTo(byteBuffer)
@@ -39,13 +39,14 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long) {
+  case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long) {
     @volatile var dequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer.rewind()
+    buffer = null
     trace("Received request : %s".format(requestObj))
 
     def updateRequestMetrics() {
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 374cd6b..63ad42e 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -141,8 +141,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     serializedMessages
   }
 
-  def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
-    val ret = new HashMap[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
+  def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
+    val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
     try {
       for (message <- messages) {
         val topicPartitionsList = getPartitionListForTopic(message)
@@ -226,7 +226,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
    * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
    * @return the set (topic, partitions) messages which incurred an error sending or processing
    */
-  private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = {
+  private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
     if(brokerId < 0) {
       warn("Failed to send data %s since partitions %s don't have a leader".format(messagesPerTopic.map(_._2),
         messagesPerTopic.map(_._1.toString).mkString(",")))
@@ -268,7 +268,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }
   }
 
-  private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
+  private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
     /** enforce the compressed.topics config here.
       *  If the compression codec is anything other than NoCompressionCodec,
       *    Enable compression only for specified topics if any
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a1a11a..877be4c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -99,8 +99,8 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
    */
-  def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messages: MessageSet) {
-    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), messages)
+  def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) {
+    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes)
     trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
 
     // send any newly unblocked responses
@@ -122,7 +122,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val numPartitionsInError = localProduceResults.count(_.error.isDefined)
     produceRequest.data.foreach(partitionAndData =>
-      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2))
+      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes))
 
     val allPartitionHaveReplicationFactorOne =
       !produceRequest.data.keySet.exists(
@@ -159,6 +159,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       debug(satisfiedProduceRequests.size +
         " producer requests unblocked during produce to local log.")
       satisfiedProduceRequests.foreach(_.respond())
+      // we do not need the data anymore
+      produceRequest.emptyData()
     }
   }
   
@@ -435,14 +437,14 @@ class KafkaApis(val requestChannel: RequestChannel,
    * A holding pen for fetch requests waiting to be satisfied
    */
   class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
-          extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) {
+          extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) {
     this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
-    def checkSatisfied(messages: MessageSet, delayedFetch: DelayedFetch): Boolean = {
-      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messages.sizeInBytes)
+    def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = {
+      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
 
@@ -543,8 +545,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           fetchPartitionStatus.error = ErrorMapping.NoError
         }
         if (!fetchPartitionStatus.acksPending) {
-          val messages = produce.data(followerFetchRequestKey.topicAndPartition)
-          maybeUnblockDelayedFetchRequests(topic, partitionId, messages)
+          val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
+          maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes)
         }
       }
 
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 26f31ec..d0c7b90 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -61,7 +61,7 @@ object SerializationTestUtils{
         case(partitionDataMessage, partition) =>
           (TopicAndPartition(topic, partition), partitionDataMessage)
       })
-    collection.immutable.Map(groupedData:_*)
+    collection.mutable.Map(groupedData:_*)
   }
 
   private val requestInfos = collection.immutable.Map(
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7395cbc..820b9ec 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -60,8 +60,10 @@ class SocketServerTest extends JUnitSuite {
   /* A simple request handler that just echos back the response */
   def processRequest(channel: RequestChannel) {
     val request = channel.receiveRequest
-    val id = request.buffer.getShort
-    val send = new BoundedByteBufferSend(request.buffer.slice)
+    val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes)
+    request.requestObj.writeTo(byteBuffer)
+    byteBuffer.rewind()
+    val send = new BoundedByteBufferSend(byteBuffer)
     channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
   }
 
@@ -80,7 +82,7 @@ class SocketServerTest extends JUnitSuite {
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
     val emptyRequest =
-      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
 
     val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
     emptyRequest.writeTo(byteBuffer)
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 89ba944..264ee51 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -84,7 +84,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties
index dacf158..de15bd7 100644
--- a/system_test/replication_testsuite/config/server.properties
+++ b/system_test/replication_testsuite/config/server.properties
@@ -53,7 +53,7 @@ log.dir=/tmp/kafka_server_logs
 
 # The number of logical partitions per topic per server. More partitions allow greater parallelism
 # for consumption, but also mean more files.
-num.partitions=5
+num.partitions=1
 
 # Overrides for for the default given by num.partitions on a per-topic basis
 #topic.partition.count.map=topic1:3, topic2:4
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index 3fc47d9..0d97b3a 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -183,8 +183,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                 kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
                 self.anonLogger.info("sleeping for 5s")
                 time.sleep(5)
-
-                if autoCreateTopic.lower() == "false":
+                
+                '''if autoCreateTopic.lower() == "false":
                     self.log_message("creating topics")
                     kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
                     self.anonLogger.info("sleeping for 5s")
@@ -443,13 +443,15 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                 # build dashboard, one for each role
                 metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME,
                                              self.testcaseEnv.testCaseDashboardsDir,
-                                             self.systemTestEnv.clusterEntityConfigDictList)
+                                             self.systemTestEnv.clusterEntityConfigDictList)'''
             except Exception as e:
                 self.log_message("Exception while running test {0}".format(e))
                 traceback.print_exc()
+           
 
-            finally:
+ 
+            '''finally:
                 if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly:
                     self.log_message("stopping all entities - please wait ...")
-                    kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
+                    kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)'''
 
diff --git a/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json b/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
index 10dc33b..7594eec 100644
--- a/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
+++ b/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
@@ -32,7 +32,8 @@
       "log.segment.size": "10240",
       "log.dir": "/tmp/kafka_server_1_logs",
       "log_filename": "kafka_server_9091.log",
-      "config_filename": "kafka_server_9091.properties"
+      "config_filename": "kafka_server_9091.properties",
+      "default.replication.factor": "3"
     },
     {
       "entity_id": "2",
@@ -41,7 +42,8 @@
       "log.segment.size": "10240",
       "log.dir": "/tmp/kafka_server_2_logs",
       "log_filename": "kafka_server_9092.log",
-      "config_filename": "kafka_server_9092.properties"
+      "config_filename": "kafka_server_9092.properties",
+      "default.replication.factor": "3"
     },
     {
       "entity_id": "3",
@@ -50,7 +52,8 @@
       "log.segment.size": "10240",
       "log.dir": "/tmp/kafka_server_3_logs",
       "log_filename": "kafka_server_9093.log",
-      "config_filename": "kafka_server_9093.properties"
+      "config_filename": "kafka_server_9093.properties",
+      "default.replication.factor": "3"
     },
     {
       "entity_id": "4",
-- 
1.7.1

