Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala (revision 1228397) +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala (working copy) @@ -42,6 +42,8 @@ numProcessorThreads = 1, monitoringPeriodSecs = 30, handlerFactory = (requestId: Short, receive: Receive) => echo, + sendBufferSize = 300000, + receiveBufferSize = 300000, maxRequestSize = 50) server.startup() Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1228397) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -66,6 +66,8 @@ config.numThreads, config.monitoringPeriodSecs, handlers.handlerFor, + config.socketSendBuffer, + config.socketReceiveBuffer, config.maxSocketRequestSize) Utils.registerMBean(socketServer.stats, statsMBeanName) socketServer.startup() Index: core/src/main/scala/kafka/network/SocketServer.scala =================================================================== --- core/src/main/scala/kafka/network/SocketServer.scala (revision 1228397) +++ core/src/main/scala/kafka/network/SocketServer.scala (working copy) @@ -38,11 +38,13 @@ val numProcessorThreads: Int, monitoringPeriodSecs: Int, private val handlerFactory: Handler.HandlerMapping, + val sendBufferSize: Int, + val receiveBufferSize: Int, val maxRequestSize: Int = Int.MaxValue) { private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) - private var acceptor: Acceptor = new Acceptor(port, processors) + private var acceptor: Acceptor = new Acceptor(port, processors, sendBufferSize, receiveBufferSize) val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs) /** @@ -116,7 +118,7 @@ /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { +private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread { /** * Accept loop that checks for new connection attempts @@ -164,14 +166,21 @@ * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { - val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept() - if(logger.isDebugEnabled) - logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress) + val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] + serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize) + + val socketChannel = serverSocketChannel.accept() socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) + socketChannel.socket().setSendBufferSize(sendBufferSize) + + if (logger.isDebugEnabled()) { + logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() + + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]") + } + processor.accept(socketChannel) } - } /** Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (revision 1228397) +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (working copy) @@ -96,7 +96,7 @@ val props = new Properties() props.put("groupid", options.valueOf(groupIdOpt)) - props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString) + props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString) props.put("fetch.size", options.valueOf(fetchSizeOpt).toString) props.put("auto.commit", "true") props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)