From 5db8acd632001804f75d7bd37c66281f4910cda5 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 21 Jul 2015 16:01:22 -0700 Subject: [PATCH] Catch exception in kafka.network.Processor to avoid socket leak and exiting unexpectedly. --- .../main/scala/kafka/network/SocketServer.scala | 96 ++++++++++++---------- 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 91319fa..cbe51c5 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -357,49 +357,52 @@ private[kafka] class Processor(val id: Int, override def run() { startupComplete() while(isRunning) { - // setup any new connections that have been queued up - configureNewConnections() - // register any new responses for writing - processNewResponses() - try { - selector.poll(300) - } catch { - case e @ (_: IllegalStateException | _: IOException) => { - error("Closing processor %s due to illegal state or IO exception".format(id)) - swallow(closeAll()) - shutdownComplete() - throw e - } - case e: InvalidReceiveException => - // Log warning and continue since Selector already closed the connection - warn("Connection was closed due to invalid receive. Processor will continue handling other connections") - } - collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach( receive => { + // setup any new connections that have been queued up + configureNewConnections() + // register any new responses for writing + processNewResponses() + try { - val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT) - requestChannel.sendRequest(req) + selector.poll(300) } catch { - case e @ (_: InvalidRequestException | _: SchemaException) => { - // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier - error("Closing socket for " + receive.source + " because of error", e) - selector.close(receive.source) + case e@(_: IllegalStateException | _: IOException) => { + error("Closing processor %s due to illegal state or IO exception".format(id)) + swallow(closeAll()) + shutdownComplete() + throw e } + case e: InvalidReceiveException => + // Log warning and continue since Selector already closed the connection + warn("Connection was closed due to invalid receive. Processor will continue handling other connections") } - selector.mute(receive.source) - }) - - collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach( send => { - val resp = inflightResponses.remove(send.destination()).get - resp.request.updateRequestMetrics() - selector.unmute(send.destination()) - }) + collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive => { + try { + val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT) + requestChannel.sendRequest(req) + } catch { + case e@(_: InvalidRequestException | _: SchemaException) => { + // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier + error("Closing socket for " + receive.source + " because of error", e) + selector.close(receive.source) + } + } + selector.mute(receive.source) + }) + + collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach(send => { + val resp = inflightResponses.remove(send.destination()).get + resp.request.updateRequestMetrics() + selector.unmute(send.destination()) + }) + } catch { + case e : Throwable => + error("Processor got uncaught exception.", e) + } } - - debug("Closing selector - processor " + id) - closeAll() + swallowError(closeAll()) shutdownComplete() } @@ -426,8 +429,6 @@ private[kafka] class Processor(val id: Int, selector.close(curr.request.connectionId) } } - - } finally { curr = requestChannel.receiveResponse(id) } @@ -448,13 +449,20 @@ private[kafka] class Processor(val id: Int, private def configureNewConnections() { while(!newConnections.isEmpty) { val channel = newConnections.poll() - debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) - val localHost = channel.socket().getLocalAddress.getHostAddress - val localPort = channel.socket().getLocalPort - val remoteHost = channel.socket().getInetAddress.getHostAddress - val remotePort = channel.socket().getPort - val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort - selector.register(connectionId, channel) + try { + debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) + val localHost = channel.socket().getLocalAddress.getHostAddress + val localPort = channel.socket().getLocalPort + val remoteHost = channel.socket().getInetAddress.getHostAddress + val remotePort = channel.socket().getPort + val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + selector.register(connectionId, channel) + } catch { + case e : Throwable => + // need to close the channel here to avoid socket leak. + close(channel) + error("Processor " + id + " closed connection from " + channel.getRemoteAddress, e) + } } } -- 1.8.3.4 (Apple Git-47)