diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b757abd..59a8c6b 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -209,7 +209,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ debug(s"Closing selector connection $connectionId") val address = channel.socketAddress if (address != null) - connectionQuotas.dec(address) + connectionQuotas.dec(address, channel.id) selector.close(connectionId) } } @@ -220,7 +220,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ def close(channel: SocketChannel) { if (channel != null) { debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) - connectionQuotas.dec(channel.socket.getInetAddress) + connectionQuotas.dec(channel.socket.getInetAddress, "unknown id") swallowError(channel.socket().close()) swallowError(channel.close()) } @@ -514,7 +514,7 @@ private[kafka] class Processor(val id: Int, }.remoteHost inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics()) // the channel has been closed by the selector but the quotas still need to be updated - connectionQuotas.dec(InetAddress.getByName(remoteHost)) + connectionQuotas.dec(InetAddress.getByName(remoteHost), connectionId) } } @@ -588,10 +588,10 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { } } - def dec(address: InetAddress) { + def dec(address: InetAddress, connectionId: String) { counts.synchronized { val count = counts.getOrElse(address, - throw new IllegalArgumentException(s"Attempted to decrease connection count for address with no connections, address: $address")) + throw new IllegalArgumentException(s"Attempted to decrease connection count for address with no connections, address: $address, connectionId: $connectionId")) if (count == 1) counts.remove(address) else