Index: core/src/main/scala/kafka/network/SocketServer.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/network/SocketServer.scala (date 1408496001000) +++ core/src/main/scala/kafka/network/SocketServer.scala (revision ) @@ -16,7 +16,7 @@ */ package kafka.network - +import java.util import java.util.concurrent._ import java.util.concurrent.atomic._ import java.net._ @@ -29,6 +29,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import com.yammer.metrics.core.Meter +import java.util.Map.Entry /** * An NIO socket server. The threading model is @@ -283,7 +284,11 @@ connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() - + private val connectionsLruTimeout: Long = TimeUnit.MINUTES.toNanos(10) + private var currentTime: Long = SystemTime.nanoseconds + private val lruConnections = new util.LinkedHashMap[SelectionKey, Long](100, .75F, true) { + override def removeEldestEntry(eldest: Entry[SelectionKey, Long]): Boolean = currentTime - eldest.getValue >= connectionsLruTimeout + } override def run() { startupComplete() while(isRunning) { @@ -293,7 +298,8 @@ processNewResponses() val startSelectTime = SystemTime.nanoseconds val ready = selector.select(300) - val idleTime = SystemTime.nanoseconds - startSelectTime + currentTime = SystemTime.nanoseconds + val idleTime = currentTime - startSelectTime idleMeter.mark(idleTime) // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and @@ -339,6 +345,16 @@ shutdownComplete() } + /** + * Close the given key and associated socket + */ + override def close(key: SelectionKey): Unit = { + if(key != null) { + lruConnections.remove(key) + } + super.close(key) + } + private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { @@ -399,6 +415,7 @@ * Process reads from ready sockets */ def read(key: SelectionKey) { + lruConnections.put(key, currentTime) val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { @@ -428,6 +445,7 @@ * Process writes to ready sockets */ def write(key: SelectionKey) { + lruConnections.put(key, currentTime) val socketChannel = channelFor(key) val response = key.attachment().asInstanceOf[RequestChannel.Response] val responseSend = response.responseSend