From 102f359ce9b0e2f7f1bc7afe442e11e14da8b2a3 Mon Sep 17 00:00:00 2001 From: jholoman Date: Sun, 14 Dec 2014 12:15:31 -0500 Subject: [PATCH 1/3] KAFKA-1512 wire in overrides per previous patch KAFKA-1512 write in overrides per previous patch KAFKA-1512 wire in Override configuration KAFKA-1512 wire in overrides KAFKA-1512 test mods --- .../main/scala/kafka/network/SocketServer.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 3 +- .../unit/kafka/network/SocketServerTest.scala | 43 ++++++++++++++++------ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index e451592..39b1651 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -47,7 +47,7 @@ class SocketServer(val brokerId: Int, val maxRequestSize: Int = Int.MaxValue, val maxConnectionsPerIp: Int = Int.MaxValue, val connectionsMaxIdleMs: Long, - val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]()) extends Logging with KafkaMetricsGroup { + val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1bf7d10..1691ad7 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -94,7 +94,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.socketReceiveBufferBytes, config.socketRequestMaxBytes, config.maxConnectionsPerIp, - config.connectionsMaxIdleMs) + config.connectionsMaxIdleMs, + config.maxConnectionsPerIpOverrides) socketServer.startup() replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5f4d852..a5cbbe9 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -25,11 +25,13 @@ import java.util.Random import junit.framework.Assert._ import kafka.producer.SyncProducerConfig import kafka.api.ProducerRequest -import java.nio.ByteBuffer +import java.nio.{BufferUnderflowException, ByteBuffer} import kafka.common.TopicAndPartition import kafka.message.ByteBufferMessageSet import java.nio.channels.SelectionKey import kafka.utils.TestUtils +import kafka.utils.Utils +import scala.collection.Map class SocketServerTest extends JUnitSuite { @@ -42,7 +44,8 @@ class SocketServerTest extends JUnitSuite { recvBufferSize = 300000, maxRequestSize = 50, maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000) + connectionsMaxIdleMs = 60*1000, + maxConnectionsPerIpOverrides = Map.empty[String,Int]) server.startup() def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { @@ -71,13 +74,12 @@ class SocketServerTest extends JUnitSuite { channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } - def connect() = new Socket("localhost", server.port) + def connect(s:SocketServer = server) = new Socket("localhost", s.port) @After def cleanup() { server.shutdown() } - @Test def simpleRequest() { val socket = connect() @@ -141,19 +143,38 @@ class SocketServerTest extends JUnitSuite { // doing a subsequent send should throw an exception as the connection should be closed. sendRequest(socket, 0, bytes) } - + @Test def testMaxConnectionsPerIp() { // make the maximum allowable number of connections and then leak them val conns = (0 until server.maxConnectionsPerIp).map(i => connect()) - // now try one more (should fail) - try { val conn = connect() - sendRequest(conn, 100, "hello".getBytes) + conn.setSoTimeout(3000) assertEquals(-1, conn.getInputStream().read()) - } catch { - case e: IOException => // this is good - } + } + @Test + def testMaxConnectionsPerIPOverrides(): Unit = { + val overrideNum = 6 + val overrides: Map[String, Int] = Map("localhost" -> overrideNum) + val overrideServer: SocketServer = new SocketServer(0, + host = null, + port = kafka.utils.TestUtils.choosePort, + numProcessorThreads = 1, + maxQueuedRequests = 50, + sendBufferSize = 300000, + recvBufferSize = 300000, + maxRequestSize = 50, + maxConnectionsPerIp = 5, + connectionsMaxIdleMs = 60*1000, + maxConnectionsPerIpOverrides = overrides) + overrideServer.startup() + // make the maximum allowable number of connections and then leak them + val conns = ((0 until overrideNum).map(i => connect(overrideServer))) + // now try one more (should fail) + val conn = connect(overrideServer) + conn.setSoTimeout(3000) + assertEquals(-1, conn.getInputStream.read()) + overrideServer.shutdown() } } -- 1.8.4 From 3da6585ee1d63dbca9533c4d70fd4b2a3d2da47c Mon Sep 17 00:00:00 2001 From: jholoman Date: Tue, 23 Dec 2014 21:30:48 -0500 Subject: [PATCH 2/3] KAFKA-1512 IP Overrides --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index a5cbbe9..7c048b6 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -25,12 +25,11 @@ import java.util.Random import junit.framework.Assert._ import kafka.producer.SyncProducerConfig import kafka.api.ProducerRequest -import java.nio.{BufferUnderflowException, ByteBuffer} +import java.nio.ByteBuffer import kafka.common.TopicAndPartition import kafka.message.ByteBufferMessageSet import java.nio.channels.SelectionKey import kafka.utils.TestUtils -import kafka.utils.Utils import scala.collection.Map class SocketServerTest extends JUnitSuite { @@ -153,6 +152,7 @@ class SocketServerTest extends JUnitSuite { conn.setSoTimeout(3000) assertEquals(-1, conn.getInputStream().read()) } + @Test def testMaxConnectionsPerIPOverrides(): Unit = { val overrideNum = 6 -- 1.8.4 From f83b769853268ba584d94f34059532972c224731 Mon Sep 17 00:00:00 2001 From: jholoman Date: Tue, 23 Dec 2014 21:33:45 -0500 Subject: [PATCH 3/3] KAFKA-1512 IP Overrides --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7c048b6..78b431f 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -148,9 +148,9 @@ class SocketServerTest extends JUnitSuite { // make the maximum allowable number of connections and then leak them val conns = (0 until server.maxConnectionsPerIp).map(i => connect()) // now try one more (should fail) - val conn = connect() - conn.setSoTimeout(3000) - assertEquals(-1, conn.getInputStream().read()) + val conn = connect() + conn.setSoTimeout(3000) + assertEquals(-1, conn.getInputStream().read()) } @Test @@ -173,7 +173,7 @@ class SocketServerTest extends JUnitSuite { val conns = ((0 until overrideNum).map(i => connect(overrideServer))) // now try one more (should fail) val conn = connect(overrideServer) - conn.setSoTimeout(3000) + conn.setSoTimeout(3000) assertEquals(-1, conn.getInputStream.read()) overrideServer.shutdown() } -- 1.8.4