From 3efbbd7041e9eff44a2fb5dc9679ba4b60885cf3 Mon Sep 17 00:00:00 2001
From: Joel Koshy <jjkoshy@gmail.com>
Date: Mon, 22 Apr 2013 16:21:05 -0700
Subject: [PATCH] Fix v1

---
 .../main/scala/kafka/network/SocketServer.scala    |   20 ++++++++++++++++----
 core/src/main/scala/kafka/server/KafkaServer.scala |    2 ++
 .../unit/kafka/network/SocketServerTest.scala      |    2 ++
 3 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 648d936..5a44c28 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -37,6 +37,8 @@ class SocketServer(val brokerId: Int,
                    val port: Int,
                    val numProcessorThreads: Int,
                    val maxQueuedRequests: Int,
+                   val sendBufferSize: Int,
+                   val recvBufferSize: Int,
                    val maxRequestSize: Int = Int.MaxValue) extends Logging {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
@@ -56,7 +58,7 @@ class SocketServer(val brokerId: Int,
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
    
     // start accepting connections
-    this.acceptor = new Acceptor(host, port, processors)
+    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize)
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
     info("started")
@@ -128,7 +130,8 @@ private[kafka] abstract class AbstractServerThread extends Runnable with Logging
 /**
  * Thread that accepts and configures new connections. There is only need for one of these
  */
-private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
+private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor],
+                              val sendBufferSize: Int, val recvBufferSize: Int) extends AbstractServerThread {
   val serverChannel = openServerSocket(host, port)
 
   /**
@@ -192,10 +195,19 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce
    * Accept a new connection
    */
   def accept(key: SelectionKey, processor: Processor) {
-    val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
-    debug("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
+    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
+    serverSocketChannel.socket().setReceiveBufferSize(recvBufferSize)
+
+    val socketChannel = serverSocketChannel.accept()
     socketChannel.configureBlocking(false)
     socketChannel.socket().setTcpNoDelay(true)
+    socketChannel.socket().setSendBufferSize(sendBufferSize)
+
+    debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
+          .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,
+                  socketChannel.socket.getSendBufferSize, sendBufferSize,
+                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))
+
     processor.accept(socketChannel)
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7298ccb..b4a57c6 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -66,6 +66,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                                     config.port,
                                     config.numNetworkThreads,
                                     config.queuedMaxRequests,
+                                    config.socketSendBufferBytes,
+                                    config.socketReceiveBufferBytes,
                                     config.socketRequestMaxBytes)
 
     socketServer.startup
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b347e66..94b5a2a 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -38,6 +38,8 @@ class SocketServerTest extends JUnitSuite {
                                               port = kafka.utils.TestUtils.choosePort,
                                               numProcessorThreads = 1,
                                               maxQueuedRequests = 50,
+                                              sendBufferSize = 300000,
+                                              recvBufferSize = 300000,
                                               maxRequestSize = 50)
   server.startup()
 
-- 
1.7.1

