From b457876ff3237a805e170c5e926d8e8e3446abeb Mon Sep 17 00:00:00 2001 From: jholoman Date: Mon, 5 Jan 2015 11:01:19 -0500 Subject: [PATCH] KAFKA-1512-update --- .../main/scala/kafka/network/SocketServer.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 3 +- .../unit/kafka/network/SocketServerTest.scala | 40 ++++++++++++++++------ 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index e451592..39b1651 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -47,7 +47,7 @@ class SocketServer(val brokerId: Int, val maxRequestSize: Int = Int.MaxValue, val maxConnectionsPerIp: Int = Int.MaxValue, val connectionsMaxIdleMs: Long, - val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]()) extends Logging with KafkaMetricsGroup { + val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1bf7d10..1691ad7 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -94,7 +94,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.socketReceiveBufferBytes, config.socketRequestMaxBytes, config.maxConnectionsPerIp, - config.connectionsMaxIdleMs) + config.connectionsMaxIdleMs, + config.maxConnectionsPerIpOverrides) socketServer.startup() replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5f4d852..6c8a4f7 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -30,6 +30,7 @@ import kafka.common.TopicAndPartition import kafka.message.ByteBufferMessageSet import java.nio.channels.SelectionKey import kafka.utils.TestUtils +import scala.collection.Map class SocketServerTest extends JUnitSuite { @@ -42,7 +43,8 @@ class SocketServerTest extends JUnitSuite { recvBufferSize = 300000, maxRequestSize = 50, maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000) + connectionsMaxIdleMs = 60*1000, + maxConnectionsPerIpOverrides = Map.empty[String,Int]) server.startup() def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { @@ -71,13 +73,12 @@ class SocketServerTest extends JUnitSuite { channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } - def connect() = new Socket("localhost", server.port) + def connect(s:SocketServer = server) = new Socket("localhost", s.port) @After def cleanup() { server.shutdown() } - @Test def simpleRequest() { val socket = connect() @@ -141,19 +142,38 @@ class SocketServerTest extends JUnitSuite { // doing a subsequent send should throw an exception as the connection should be closed. sendRequest(socket, 0, bytes) } - + @Test def testMaxConnectionsPerIp() { // make the maximum allowable number of connections and then leak them val conns = (0 until server.maxConnectionsPerIp).map(i => connect()) - // now try one more (should fail) - try { val conn = connect() - sendRequest(conn, 100, "hello".getBytes) + conn.setSoTimeout(3000) assertEquals(-1, conn.getInputStream().read()) - } catch { - case e: IOException => // this is good - } + } + @Test + def testMaxConnectionsPerIPOverrides(): Unit = { + val overrideNum = 6 + val overrides: Map[String, Int] = Map("localhost" -> overrideNum) + val overrideServer: SocketServer = new SocketServer(0, + host = null, + port = kafka.utils.TestUtils.choosePort, + numProcessorThreads = 1, + maxQueuedRequests = 50, + sendBufferSize = 300000, + recvBufferSize = 300000, + maxRequestSize = 50, + maxConnectionsPerIp = 5, + connectionsMaxIdleMs = 60*1000, + maxConnectionsPerIpOverrides = overrides) + overrideServer.startup() + // make the maximum allowable number of connections and then leak them + val conns = ((0 until overrideNum).map(i => connect(overrideServer))) + // now try one more (should fail) + val conn = connect(overrideServer) + conn.setSoTimeout(3000) + assertEquals(-1, conn.getInputStream.read()) + overrideServer.shutdown() } } -- 1.8.4