From 5db8acd632001804f75d7bd37c66281f4910cda5 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 21 Jul 2015 16:01:22 -0700 Subject: [PATCH 1/2] Catch exception in kafka.network.Processor to avoid socket leak and exiting unexpectedly. --- .../main/scala/kafka/network/SocketServer.scala | 96 ++++++++++++---------- 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 91319fa..cbe51c5 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -357,49 +357,52 @@ private[kafka] class Processor(val id: Int, override def run() { startupComplete() while(isRunning) { - // setup any new connections that have been queued up - configureNewConnections() - // register any new responses for writing - processNewResponses() - try { - selector.poll(300) - } catch { - case e @ (_: IllegalStateException | _: IOException) => { - error("Closing processor %s due to illegal state or IO exception".format(id)) - swallow(closeAll()) - shutdownComplete() - throw e - } - case e: InvalidReceiveException => - // Log warning and continue since Selector already closed the connection - warn("Connection was closed due to invalid receive. Processor will continue handling other connections") - } - collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach( receive => { + // setup any new connections that have been queued up + configureNewConnections() + // register any new responses for writing + processNewResponses() + try { - val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT) - requestChannel.sendRequest(req) + selector.poll(300) } catch { - case e @ (_: InvalidRequestException | _: SchemaException) => { - // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier - error("Closing socket for " + receive.source + " because of error", e) - selector.close(receive.source) + case e@(_: IllegalStateException | _: IOException) => { + error("Closing processor %s due to illegal state or IO exception".format(id)) + swallow(closeAll()) + shutdownComplete() + throw e } + case e: InvalidReceiveException => + // Log warning and continue since Selector already closed the connection + warn("Connection was closed due to invalid receive. Processor will continue handling other connections") } - selector.mute(receive.source) - }) - - collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach( send => { - val resp = inflightResponses.remove(send.destination()).get - resp.request.updateRequestMetrics() - selector.unmute(send.destination()) - }) + collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive => { + try { + val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT) + requestChannel.sendRequest(req) + } catch { + case e@(_: InvalidRequestException | _: SchemaException) => { + // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier + error("Closing socket for " + receive.source + " because of error", e) + selector.close(receive.source) + } + } + selector.mute(receive.source) + }) + + collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach(send => { + val resp = inflightResponses.remove(send.destination()).get + resp.request.updateRequestMetrics() + selector.unmute(send.destination()) + }) + } catch { + case e : Throwable => + error("Processor got uncaught exception.", e) + } } - - debug("Closing selector - processor " + id) - closeAll() + swallowError(closeAll()) shutdownComplete() } @@ -426,8 +429,6 @@ private[kafka] class Processor(val id: Int, selector.close(curr.request.connectionId) } } - - } finally { curr = requestChannel.receiveResponse(id) } @@ -448,13 +449,20 @@ private[kafka] class Processor(val id: Int, private def configureNewConnections() { while(!newConnections.isEmpty) { val channel = newConnections.poll() - debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) - val localHost = channel.socket().getLocalAddress.getHostAddress - val localPort = channel.socket().getLocalPort - val remoteHost = channel.socket().getInetAddress.getHostAddress - val remotePort = channel.socket().getPort - val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort - selector.register(connectionId, channel) + try { + debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) + val localHost = channel.socket().getLocalAddress.getHostAddress + val localPort = channel.socket().getLocalPort + val remoteHost = channel.socket().getInetAddress.getHostAddress + val remotePort = channel.socket().getPort + val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + selector.register(connectionId, channel) + } catch { + case e : Throwable => + // need to close the channel here to avoid socket leak. + close(channel) + error("Processor " + id + " closed connection from " + channel.getRemoteAddress, e) + } } } -- 1.8.3.4 (Apple Git-47) From 1ff9d65771819622bdcd2544b1cf9ec76df4442e Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 21 Jul 2015 21:59:20 -0700 Subject: [PATCH 2/2] Addressed Gwen's comments --- core/src/main/scala/kafka/network/SocketServer.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index cbe51c5..8c01fc3 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.protocol.types.SchemaException import org.apache.kafka.common.utils.{SystemTime, Time, Utils} import scala.collection._ +import scala.util.control.ControlThrowable /** * An NIO socket server. The threading model is @@ -366,7 +367,7 @@ private[kafka] class Processor(val id: Int, try { selector.poll(300) } catch { - case e@(_: IllegalStateException | _: IOException) => { + case e @ (_: IllegalStateException | _: IOException) => { error("Closing processor %s due to illegal state or IO exception".format(id)) swallow(closeAll()) shutdownComplete() @@ -381,7 +382,7 @@ private[kafka] class Processor(val id: Int, val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT) requestChannel.sendRequest(req) } catch { - case e@(_: InvalidRequestException | _: SchemaException) => { + case e @ (_: InvalidRequestException | _: SchemaException) => { // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) selector.close(receive.source) @@ -396,6 +397,7 @@ private[kafka] class Processor(val id: Int, selector.unmute(send.destination()) }) } catch { + case e : ControlThrowable => throw e case e : Throwable => error("Processor got uncaught exception.", e) } @@ -458,7 +460,9 @@ private[kafka] class Processor(val id: Int, val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort selector.register(connectionId, channel) } catch { - case e : Throwable => + case e @ (_: ClosedChannelException | _: ClosedSelectorException | + _: IllegalBlockingModeException | _: IllegalSelectorException | + _: CancelledKeyException | _: java.lang.IllegalArgumentException) => // need to close the channel here to avoid socket leak. close(channel) error("Processor " + id + " closed connection from " + channel.getRemoteAddress, e) -- 1.8.3.4 (Apple Git-47)