Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala (revision 1200922) +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala (working copy) @@ -42,6 +42,8 @@ numProcessorThreads = 1, monitoringPeriodSecs = 30, handlerFactory = (requestId: Short, receive: Receive) => echo, + sendBufferSize = 300000, + receiveBufferSize = 300000, maxRequestSize = 50) server.startup() Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1200922) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -69,6 +69,8 @@ config.numThreads, config.monitoringPeriodSecs, handlers.handlerFor, + config.socketSendBuffer, + config.socketReceiveBuffer, config.maxSocketRequestSize) Utils.swallow(logger.warn, Utils.registerMBean(socketServer.stats, statsMBeanName)) socketServer.startup Index: core/src/main/scala/kafka/network/SocketServer.scala =================================================================== --- core/src/main/scala/kafka/network/SocketServer.scala (revision 1200922) +++ core/src/main/scala/kafka/network/SocketServer.scala (working copy) @@ -38,12 +38,14 @@ val numProcessorThreads: Int, monitoringPeriodSecs: Int, private val handlerFactory: Handler.HandlerMapping, + val sendBufferSize: Int, + val receiveBufferSize: Int, val maxRequestSize: Int = Int.MaxValue) { private val logger = Logger.getLogger(classOf[SocketServer]) private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) - private var acceptor: Acceptor = new Acceptor(port, processors) + private var acceptor: Acceptor = new Acceptor(port, processors, sendBufferSize, receiveBufferSize) val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs) /** @@ -117,8 +119,7 @@ /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { - +private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread { /** * Accept loop that checks for new connection attempts */ @@ -170,6 +171,14 @@ logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) + socketChannel.socket().setSendBufferSize(sendBufferSize) + socketChannel.socket().setReceiveBufferSize(receiveBufferSize) + + if (logger.isDebugEnabled()) { + logger.debug("sendBufferSize : [" + socketChannel.socket().getSendBufferSize() + "]") + logger.debug("receiveBufferSize : [" + socketChannel.socket().getReceiveBufferSize() + "]") + } + processor.accept(socketChannel) }