diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 94b5a2a..a48bf62 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -29,6 +29,7 @@ import java.nio.ByteBuffer import kafka.common.TopicAndPartition import kafka.message.ByteBufferMessageSet import java.nio.channels.SelectionKey +import kafka.utils.TestUtils class SocketServerTest extends JUnitSuite { @@ -123,19 +124,14 @@ class SocketServerTest extends JUnitSuite { byteBuffer.get(serializedBytes) sendRequest(socket, 0, serializedBytes) - sendRequest(socket, 0, serializedBytes) - // here the socket server should've read only the first request completely and since the response is not sent yet - // the selection key should not be readable val request = server.requestChannel.receiveRequest + // Since the response is not sent yet, the selection key should not be readable. Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ) server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null)) - // if everything is working correctly, until you send a response for the first request, - // the 2nd request will not be read by the socket server - val request2 = server.requestChannel.receiveRequest - server.requestChannel.sendResponse(new RequestChannel.Response(0, request2, null)) - Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ) + // After the response is sent to the client (which is async and may take a bit of time), the socket key should be available for reads. + TestUtils.waitUntilTrue( () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ }, 5000) } }