Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala (revision ee1267b127f3081db491fa1bf9a287084c324e36) +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala (revision ) @@ -17,19 +17,16 @@ package kafka.network; -import java.net._ import java.io._ -import org.junit._ -import org.scalatest.junit.JUnitSuite +import java.nio.channels.SelectionKey import java.util.Random + import junit.framework.Assert._ -import kafka.producer.SyncProducerConfig -import kafka.api.ProducerRequest -import java.nio.ByteBuffer -import kafka.common.TopicAndPartition -import kafka.message.ByteBufferMessageSet -import java.nio.channels.SelectionKey +import kafka.network.SocketServerTestUtils._ import kafka.utils.TestUtils +import org.junit._ +import org.scalatest.junit.JUnitSuite + import scala.collection.Map class SocketServerTest extends JUnitSuite { @@ -43,38 +40,12 @@ recvBufferSize = 300000, maxRequestSize = 50, maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000, + connectionsMaxIdleMs = 600, maxConnectionsPerIpOverrides = Map.empty[String,Int]) server.startup() - def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { - val outgoing = new DataOutputStream(socket.getOutputStream) - outgoing.writeInt(request.length + 2) - outgoing.writeShort(id) - outgoing.write(request) - outgoing.flush() - } + def connect(s:SocketServer = server) = SocketServerTestUtils.connect(s) - def receiveResponse(socket: Socket): Array[Byte] = { - val incoming = new DataInputStream(socket.getInputStream) - val len = incoming.readInt() - val response = new Array[Byte](len) - incoming.readFully(response) - response - } - - /* A simple request handler that just echos back the response */ - def processRequest(channel: RequestChannel) { - val request = channel.receiveRequest - val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes) - request.requestObj.writeTo(byteBuffer) - byteBuffer.rewind() - val send = new BoundedByteBufferSend(byteBuffer) - channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) - } - - def connect(s:SocketServer = server) = new Socket("localhost", s.port) - @After def cleanup() { server.shutdown() @@ -82,19 +53,7 @@ @Test def simpleRequest() { val socket = connect() - val correlationId = -1 - val clientId = SyncProducerConfig.DefaultClientId - val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = - new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) - - val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) - emptyRequest.writeTo(byteBuffer) - byteBuffer.rewind() - val serializedBytes = new Array[Byte](byteBuffer.remaining) - byteBuffer.get(serializedBytes) - + val serializedBytes = newRequestBytes sendRequest(socket, 0, serializedBytes) processRequest(server.requestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq) Index: core/src/main/scala/kafka/network/SocketServer.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/network/SocketServer.scala (revision ee1267b127f3081db491fa1bf9a287084c324e36) +++ core/src/main/scala/kafka/network/SocketServer.scala (revision ) @@ -306,7 +306,7 @@ private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() private val connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000 private var currentTimeNanos = SystemTime.nanoseconds - private val lruConnections = new util.LinkedHashMap[SelectionKey, Long] + private val lruConnections = new util.LinkedHashMap[SelectionKey, Long](16, .75F, true) private var nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos override def run() { Index: core/src/test/scala/unit/kafka/network/SocketServerMaxIdleTest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerMaxIdleTest.scala (revision ) +++ core/src/test/scala/unit/kafka/network/SocketServerMaxIdleTest.scala (revision ) @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.io._ + +import kafka.network.SocketServerTestUtils._ +import org.junit._ +import org.scalatest.junit.JUnitSuite + +import scala.collection.Map + +class SocketServerMaxIdleTest extends JUnitSuite { + + val server: SocketServer = new SocketServer(0, + host = null, + port = kafka.utils.TestUtils.choosePort, + numProcessorThreads = 1, + maxQueuedRequests = 50, + sendBufferSize = 300000, + recvBufferSize = 300000, + maxRequestSize = 50, + maxConnectionsPerIp = 5, + connectionsMaxIdleMs = 50, + maxConnectionsPerIpOverrides = Map.empty[String,Int]) + server.startup() + + @Test + def testSocketsCloseAfterMaxIdle() { + //do a request response cycle + val socket = connect(server) + val serializedBytes = newRequestBytes + + sendRequest(socket, 0, serializedBytes) + processRequest(server.requestChannel) + receiveResponse(socket) + + // then wait for remaining to max idle time + some more to allow a epoll cycle + Thread.sleep(server.connectionsMaxIdleMs * 15 /10 + 450) + // doing a subsequent send should throw an exception as the connection should be closed + try { + sendRequest(socket, 0, serializedBytes) + sendRequest(socket, 0, serializedBytes) + fail("Expecting I/O error") + } catch { + case (e: IOException) => //good + } + } +} + Index: core/src/test/scala/unit/kafka/network/SocketServerTestUtils.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerTestUtils.scala (revision ) +++ core/src/test/scala/unit/kafka/network/SocketServerTestUtils.scala (revision ) @@ -0,0 +1,60 @@ +package kafka.network + +import java.io.{DataInputStream, DataOutputStream} +import java.net.Socket +import java.nio.ByteBuffer + +import kafka.api.ProducerRequest +import kafka.common.TopicAndPartition +import kafka.message.ByteBufferMessageSet +import kafka.producer.SyncProducerConfig + +/** + * Created by dnmaras on 6/1/15. + */ +object SocketServerTestUtils { + + def connect(s:SocketServer) = new Socket("localhost", s.port) + + def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { + val outgoing = new DataOutputStream(socket.getOutputStream) + outgoing.writeInt(request.length + 2) + outgoing.writeShort(id) + outgoing.write(request) + outgoing.flush() + } + + def receiveResponse(socket: Socket): Array[Byte] = { + val incoming = new DataInputStream(socket.getInputStream) + val len = incoming.readInt() + val response = new Array[Byte](len) + incoming.readFully(response) + response + } + + /* A simple request handler that just echos back the response */ + def processRequest(channel: RequestChannel) { + val request = channel.receiveRequest + val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes) + request.requestObj.writeTo(byteBuffer) + byteBuffer.rewind() + val send = new BoundedByteBufferSend(byteBuffer) + channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) + } + + def newRequestBytes: Array[Byte] = { + val correlationId = -1 + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack = SyncProducerConfig.DefaultRequiredAcks + val emptyRequest = + new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) + + val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) + emptyRequest.writeTo(byteBuffer) + byteBuffer.rewind() + val serializedBytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(serializedBytes) + serializedBytes + } +}