From 5a3ffe417a70e22664d4d0619eca98ed67883951 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 13 Aug 2014 08:49:30 -0700 Subject: [PATCH 1/2] KAFKA-1577. Exception in ConnectionQuotas while shutting down. --- .../main/scala/kafka/network/SocketServer.scala | 52 +++++++++++----------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 9693bc0..94203db 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -61,19 +61,19 @@ class SocketServer(val brokerId: Int, def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, - time, - maxRequestSize, + processors(i) = new Processor(i, + time, + maxRequestSize, aggregateIdleMeter, newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), - numProcessorThreads, + numProcessorThreads, requestChannel, quotas) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } // register the processor threads for notification of responses requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) - + // start accepting connections this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() @@ -134,12 +134,12 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ * Is the server still running? */ protected def isRunning = alive.get - + /** * Wakeup the thread for selection. */ def wakeup() = selector.wakeup() - + /** * Close the given key and associated socket */ @@ -150,7 +150,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ swallowError(key.cancel()) } } - + def close(channel: SocketChannel) { if(channel != null) { debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) @@ -159,7 +159,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ swallowError(channel.close()) } } - + /** * Close all open connections */ @@ -170,16 +170,16 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ close(key) } } - + } /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val host: String, - val port: Int, +private[kafka] class Acceptor(val host: String, + val port: Int, private val processors: Array[Processor], - val sendBufferSize: Int, + val sendBufferSize: Int, val recvBufferSize: Int, connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { val serverChannel = openServerSocket(host, port) @@ -219,12 +219,12 @@ private[kafka] class Acceptor(val host: String, swallowError(selector.close()) shutdownComplete() } - + /* * Create a server socket to listen for connections on. */ def openServerSocket(host: String, port: Int): ServerSocketChannel = { - val socketAddress = + val socketAddress = if(host == null || host.trim.isEmpty) new InetSocketAddress(port) else @@ -236,7 +236,7 @@ private[kafka] class Acceptor(val host: String, serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) } catch { - case e: SocketException => + case e: SocketException => throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e) } serverChannel @@ -281,7 +281,7 @@ private[kafka] class Processor(val id: Int, val totalProcessorThreads: Int, val requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { - + private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() override def run() { @@ -454,7 +454,7 @@ private[kafka] class Processor(val id: Int, class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { private val overrides = overrideQuotas.map(entry => (InetAddress.getByName(entry._1), entry._2)) private val counts = mutable.Map[InetAddress, Int]() - + def inc(addr: InetAddress) { counts synchronized { val count = counts.getOrElse(addr, 0) @@ -464,17 +464,19 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { throw new TooManyConnectionsException(addr, max) } } - + def dec(addr: InetAddress) { counts synchronized { - val count = counts.get(addr).get - if(count == 1) - counts.remove(addr) - else - counts.put(addr, count - 1) + if(counts.contains(addr)) { + val count = counts.get(addr).get + if(count == 1) + counts.remove(addr) + else + counts.put(addr, count - 1) + } } } - + } class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too many connections from %s (maximum = %d)".format(ip, count)) -- 1.8.5.2 (Apple Git-48) From f67b3ecf124a3ff7a5fc8db71bbe34654f109946 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 20 Aug 2014 19:56:37 -0700 Subject: [PATCH 2/2] KAFKA-1577. Exception in ConnectionQuotas while shutting down. --- .../main/scala/kafka/network/SocketServer.scala | 40 +++++++++++----------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 94203db..6ab77f6 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -61,19 +61,19 @@ class SocketServer(val brokerId: Int, def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, - time, - maxRequestSize, + processors(i) = new Processor(i, + time, + maxRequestSize, aggregateIdleMeter, newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), - numProcessorThreads, + numProcessorThreads, requestChannel, quotas) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } // register the processor threads for notification of responses requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) - + // start accepting connections this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() @@ -134,12 +134,12 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ * Is the server still running? */ protected def isRunning = alive.get - + /** * Wakeup the thread for selection. */ def wakeup() = selector.wakeup() - + /** * Close the given key and associated socket */ @@ -150,7 +150,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ swallowError(key.cancel()) } } - + def close(channel: SocketChannel) { if(channel != null) { debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) @@ -159,7 +159,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ swallowError(channel.close()) } } - + /** * Close all open connections */ @@ -170,16 +170,16 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ close(key) } } - + } /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val host: String, - val port: Int, +private[kafka] class Acceptor(val host: String, + val port: Int, private val processors: Array[Processor], - val sendBufferSize: Int, + val sendBufferSize: Int, val recvBufferSize: Int, connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { val serverChannel = openServerSocket(host, port) @@ -219,12 +219,12 @@ private[kafka] class Acceptor(val host: String, swallowError(selector.close()) shutdownComplete() } - + /* * Create a server socket to listen for connections on. */ def openServerSocket(host: String, port: Int): ServerSocketChannel = { - val socketAddress = + val socketAddress = if(host == null || host.trim.isEmpty) new InetSocketAddress(port) else @@ -236,7 +236,7 @@ private[kafka] class Acceptor(val host: String, serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) } catch { - case e: SocketException => + case e: SocketException => throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e) } serverChannel @@ -281,7 +281,7 @@ private[kafka] class Processor(val id: Int, val totalProcessorThreads: Int, val requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { - + private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() override def run() { @@ -454,7 +454,7 @@ private[kafka] class Processor(val id: Int, class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { private val overrides = overrideQuotas.map(entry => (InetAddress.getByName(entry._1), entry._2)) private val counts = mutable.Map[InetAddress, Int]() - + def inc(addr: InetAddress) { counts synchronized { val count = counts.getOrElse(addr, 0) @@ -464,7 +464,7 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { throw new TooManyConnectionsException(addr, max) } } - + def dec(addr: InetAddress) { counts synchronized { if(counts.contains(addr)) { @@ -476,7 +476,7 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { } } } - + } class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too many connections from %s (maximum = %d)".format(ip, count)) -- 1.8.5.2 (Apple Git-48)