From f1723648ec5a03156a06f23b79e85c2b92f4226b Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Tue, 21 Jul 2015 13:02:18 -0700 Subject: [PATCH 1/2] Added a try-catch to catch any exceptions thrown by the nioSelector --- .../main/scala/kafka/network/SocketServer.scala | 49 +++++++++++++--------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 91319fa..b1d0595 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -231,28 +231,39 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 - while(isRunning) { - val ready = nioSelector.select(500) - if(ready > 0) { - val keys = nioSelector.selectedKeys() - val iter = keys.iterator() - while(iter.hasNext && isRunning) { - var key: SelectionKey = null - try { - key = iter.next - iter.remove() - if(key.isAcceptable) - accept(key, processors(currentProcessor)) - else - throw new IllegalStateException("Unrecognized key state for acceptor thread.") - - // round robin to the next processor thread - currentProcessor = (currentProcessor + 1) % processors.length - } catch { - case e: Throwable => error("Error while accepting connection", e) + try + { + while (isRunning) + { + val ready = nioSelector.select(500) + if (ready > 0) + { + val keys = nioSelector.selectedKeys() + val iter = keys.iterator() + while (iter.hasNext && isRunning) + { + var key: SelectionKey = null + try + { + key = iter.next + iter.remove() + if (key.isAcceptable) + accept(key, processors(currentProcessor)) + else + throw new IllegalStateException("Unrecognized key state for acceptor thread.") + + // round robin to the next processor thread + currentProcessor = (currentProcessor + 1) % processors.length + } + catch + { + case e: Throwable => error("Error while accepting connection", e) + } } } } + } catch { + case e : Throwable => error("Error occurred", e) } debug("Closing server socket and selector.") swallowError(serverChannel.close()) -- 1.7.12.4 From 504bb2a88ebc7df2189d30e8d3b0669a4aab2a9f Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Tue, 21 Jul 2015 14:57:16 -0700 Subject: [PATCH 2/2] Added a try-catch to catch any exceptions thrown by the nioSelector --- .../main/scala/kafka/network/SocketServer.scala | 28 +++++++++------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b1d0595..fb4b575 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -231,39 +231,33 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 - try - { - while (isRunning) - { + while(isRunning) { + try { val ready = nioSelector.select(500) - if (ready > 0) - { + if(ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() - while (iter.hasNext && isRunning) - { + while(iter.hasNext && isRunning) { var key: SelectionKey = null - try - { + try { key = iter.next iter.remove() - if (key.isAcceptable) + if(key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length + } catch { + case e: Throwable => error("Error while accepting connection", e) } - catch - { - case e: Throwable => error("Error while accepting connection", e) - } } } + } catch { + case e: InterruptedException => debug("Received an interrupt", e) + case e: Throwable => error("Error occurred", e) } - } catch { - case e : Throwable => error("Error occurred", e) } debug("Closing server socket and selector.") swallowError(serverChannel.close()) -- 1.7.12.4