From d5a0409dfefbbe76c3425c46c4860fa7a5cb78b3 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Tue, 21 Jul 2015 13:02:18 -0700 Subject: [PATCH 1/4] Added a try-catch to catch any exceptions thrown by the nioSelector --- .../main/scala/kafka/network/SocketServer.scala | 49 +++++++++++++--------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index dbe784b..48b84e6 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -232,28 +232,39 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 - while(isRunning) { - val ready = nioSelector.select(500) - if(ready > 0) { - val keys = nioSelector.selectedKeys() - val iter = keys.iterator() - while(iter.hasNext && isRunning) { - var key: SelectionKey = null - try { - key = iter.next - iter.remove() - if(key.isAcceptable) - accept(key, processors(currentProcessor)) - else - throw new IllegalStateException("Unrecognized key state for acceptor thread.") - - // round robin to the next processor thread - currentProcessor = (currentProcessor + 1) % processors.length - } catch { - case e: Throwable => error("Error while accepting connection", e) + try + { + while (isRunning) + { + val ready = nioSelector.select(500) + if (ready > 0) + { + val keys = nioSelector.selectedKeys() + val iter = keys.iterator() + while (iter.hasNext && isRunning) + { + var key: SelectionKey = null + try + { + key = iter.next + iter.remove() + if (key.isAcceptable) + accept(key, processors(currentProcessor)) + else + throw new IllegalStateException("Unrecognized key state for acceptor thread.") + + // round robin to the next processor thread + currentProcessor = (currentProcessor + 1) % processors.length + } + catch + { + case e: Throwable => error("Error while accepting connection", e) + } } } } + } catch { + case e : Throwable => error("Error occurred", e) } debug("Closing server socket and selector.") swallowError(serverChannel.close()) -- 1.7.12.4 From a86dac1765662c051d0600d610dcad030c75e3c8 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Tue, 21 Jul 2015 14:57:16 -0700 Subject: [PATCH 2/4] Added a try-catch to catch any exceptions thrown by the nioSelector --- .../main/scala/kafka/network/SocketServer.scala | 28 +++++++++------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 48b84e6..b40c498 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -232,39 +232,33 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 - try - { - while (isRunning) - { + while(isRunning) { + try { val ready = nioSelector.select(500) - if (ready > 0) - { + if(ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() - while (iter.hasNext && isRunning) - { + while(iter.hasNext && isRunning) { var key: SelectionKey = null - try - { + try { key = iter.next iter.remove() - if (key.isAcceptable) + if(key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length + } catch { + case e: Throwable => error("Error while accepting connection", e) } - catch - { - case e: Throwable => error("Error while accepting connection", e) - } } } + } catch { + case e: InterruptedException => debug("Received an interrupt", e) + case e: Throwable => error("Error occurred", e) } - } catch { - case e : Throwable => error("Error occurred", e) } debug("Closing server socket and selector.") swallowError(serverChannel.close()) -- 1.7.12.4 From 288bf657db569acaf63b5317e08b3c762d905d1c Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Thu, 23 Jul 2015 21:33:59 -0700 Subject: [PATCH 3/4] Addressed comments on the Jira ticket --- core/src/main/scala/kafka/network/SocketServer.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b40c498..b3706cf 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.InvalidReceiveException import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException -import org.apache.kafka.common.utils.{SystemTime, Time, Utils} +import org.apache.kafka.common.utils.{Time, Utils} import scala.collection._ import scala.util.control.{NonFatal, ControlThrowable} @@ -256,7 +256,11 @@ private[kafka] class Acceptor(val host: String, } } } catch { + // We catch all the throwables to prevent the acceptor thread from exiting in case if there are + // exceptions due to specific select operation on a channel or a bad request. We don't want the + // the broker to stop responding to requests from other clients. case e: InterruptedException => debug("Received an interrupt", e) + case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } -- 1.7.12.4 From b7c9caea7a6a9d998c36db23f90ed85cc2386e2c Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Thu, 13 Aug 2015 10:38:31 -0700 Subject: [PATCH 4/4] Addressed Jun's comments --- .../main/scala/kafka/network/SocketServer.scala | 65 +++++++++++----------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b3706cf..b96de53 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -232,42 +232,45 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 - while(isRunning) { - try { - val ready = nioSelector.select(500) - if(ready > 0) { - val keys = nioSelector.selectedKeys() - val iter = keys.iterator() - while(iter.hasNext && isRunning) { - var key: SelectionKey = null - try { - key = iter.next - iter.remove() - if(key.isAcceptable) - accept(key, processors(currentProcessor)) - else - throw new IllegalStateException("Unrecognized key state for acceptor thread.") - - // round robin to the next processor thread - currentProcessor = (currentProcessor + 1) % processors.length - } catch { - case e: Throwable => error("Error while accepting connection", e) + try { + while (isRunning) { + try { + val ready = nioSelector.select(500) + if (ready > 0) { + val keys = nioSelector.selectedKeys() + val iter = keys.iterator() + while (iter.hasNext && isRunning) { + var key: SelectionKey = null + try { + key = iter.next + iter.remove() + if (key.isAcceptable) + accept(key, processors(currentProcessor)) + else + throw new IllegalStateException("Unrecognized key state for acceptor thread.") + + // round robin to the next processor thread + currentProcessor = (currentProcessor + 1) % processors.length + } catch { + case e: Throwable => error("Error while accepting connection", e) + } } } + } catch { + // We catch all the throwables to prevent the acceptor thread from exiting in case if there are + // exceptions due to specific select operation on a channel or a bad request. We don't want the + // the broker to stop responding to requests from other clients. + case e: InterruptedException => debug("Received an interrupt", e) + case e: ControlThrowable => throw e + case e: Throwable => error("Error occurred", e) } - } catch { - // We catch all the throwables to prevent the acceptor thread from exiting in case if there are - // exceptions due to specific select operation on a channel or a bad request. We don't want the - // the broker to stop responding to requests from other clients. - case e: InterruptedException => debug("Received an interrupt", e) - case e: ControlThrowable => throw e - case e: Throwable => error("Error occurred", e) } + } finally { + debug("Closing server socket and selector.") + swallowError(serverChannel.close()) + swallowError(nioSelector.close()) + shutdownComplete() } - debug("Closing server socket and selector.") - swallowError(serverChannel.close()) - swallowError(nioSelector.close()) - shutdownComplete() } /* -- 1.7.12.4