From f1723648ec5a03156a06f23b79e85c2b92f4226b Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Tue, 21 Jul 2015 13:02:18 -0700 Subject: [PATCH 1/3] Added a try-catch to catch any exceptions thrown by the nioSelector --- .../main/scala/kafka/network/SocketServer.scala | 49 +++++++++++++--------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 91319fa..b1d0595 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -231,28 +231,39 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 - while(isRunning) { - val ready = nioSelector.select(500) - if(ready > 0) { - val keys = nioSelector.selectedKeys() - val iter = keys.iterator() - while(iter.hasNext && isRunning) { - var key: SelectionKey = null - try { - key = iter.next - iter.remove() - if(key.isAcceptable) - accept(key, processors(currentProcessor)) - else - throw new IllegalStateException("Unrecognized key state for acceptor thread.") - - // round robin to the next processor thread - currentProcessor = (currentProcessor + 1) % processors.length - } catch { - case e: Throwable => error("Error while accepting connection", e) + try + { + while (isRunning) + { + val ready = nioSelector.select(500) + if (ready > 0) + { + val keys = nioSelector.selectedKeys() + val iter = keys.iterator() + while (iter.hasNext && isRunning) + { + var key: SelectionKey = null + try + { + key = iter.next + iter.remove() + if (key.isAcceptable) + accept(key, processors(currentProcessor)) + else + throw new IllegalStateException("Unrecognized key state for acceptor thread.") + + // round robin to the next processor thread + currentProcessor = (currentProcessor + 1) % processors.length + } + catch + { + case e: Throwable => error("Error while accepting connection", e) + } } } } + } catch { + case e : Throwable => error("Error occurred", e) } debug("Closing server socket and selector.") swallowError(serverChannel.close()) -- 1.7.12.4 From 504bb2a88ebc7df2189d30e8d3b0669a4aab2a9f Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Tue, 21 Jul 2015 14:57:16 -0700 Subject: [PATCH 2/3] Added a try-catch to catch any exceptions thrown by the nioSelector --- .../main/scala/kafka/network/SocketServer.scala | 28 +++++++++------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b1d0595..fb4b575 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -231,39 +231,33 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 - try - { - while (isRunning) - { + while(isRunning) { + try { val ready = nioSelector.select(500) - if (ready > 0) - { + if(ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() - while (iter.hasNext && isRunning) - { + while(iter.hasNext && isRunning) { var key: SelectionKey = null - try - { + try { key = iter.next iter.remove() - if (key.isAcceptable) + if(key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length + } catch { + case e: Throwable => error("Error while accepting connection", e) } - catch - { - case e: Throwable => error("Error while accepting connection", e) - } } } + } catch { + case e: InterruptedException => debug("Received an interrupt", e) + case e: Throwable => error("Error occurred", e) } - } catch { - case e : Throwable => error("Error occurred", e) } debug("Closing server socket and selector.") swallowError(serverChannel.close()) -- 1.7.12.4 From 564a3dd3a89812ad9f5b540b639fba746eb56bc2 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Thu, 23 Jul 2015 21:33:59 -0700 Subject: [PATCH 3/3] Addressed comments on the Jira ticket --- core/src/main/scala/kafka/network/SocketServer.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index fb4b575..6ee20a4 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -34,9 +34,10 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.InvalidReceiveException import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException -import org.apache.kafka.common.utils.{SystemTime, Time, Utils} +import org.apache.kafka.common.utils.{Time, Utils} import scala.collection._ +import scala.util.control.ControlThrowable /** * An NIO socket server. The threading model is @@ -255,7 +256,11 @@ private[kafka] class Acceptor(val host: String, } } } catch { + // We catch all the throwables to prevent the acceptor thread from exiting in case if there are + // exceptions due to specific select operation on a channel or a bad request. We don't want the + // the broker to stop responding to requests from other clients. case e: InterruptedException => debug("Received an interrupt", e) + case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } -- 1.7.12.4