From f1723648ec5a03156a06f23b79e85c2b92f4226b Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Tue, 21 Jul 2015 13:02:18 -0700 Subject: [PATCH] Added a try-catch to catch any exceptions thrown by the nioSelector --- .../main/scala/kafka/network/SocketServer.scala | 49 +++++++++++++--------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 91319fa..b1d0595 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -231,28 +231,39 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 - while(isRunning) { - val ready = nioSelector.select(500) - if(ready > 0) { - val keys = nioSelector.selectedKeys() - val iter = keys.iterator() - while(iter.hasNext && isRunning) { - var key: SelectionKey = null - try { - key = iter.next - iter.remove() - if(key.isAcceptable) - accept(key, processors(currentProcessor)) - else - throw new IllegalStateException("Unrecognized key state for acceptor thread.") - - // round robin to the next processor thread - currentProcessor = (currentProcessor + 1) % processors.length - } catch { - case e: Throwable => error("Error while accepting connection", e) + try + { + while (isRunning) + { + val ready = nioSelector.select(500) + if (ready > 0) + { + val keys = nioSelector.selectedKeys() + val iter = keys.iterator() + while (iter.hasNext && isRunning) + { + var key: SelectionKey = null + try + { + key = iter.next + iter.remove() + if (key.isAcceptable) + accept(key, processors(currentProcessor)) + else + throw new IllegalStateException("Unrecognized key state for acceptor thread.") + + // round robin to the next processor thread + currentProcessor = (currentProcessor + 1) % processors.length + } + catch + { + case e: Throwable => error("Error while accepting connection", e) + } } } } + } catch { + case e : Throwable => error("Error occurred", e) } debug("Closing server socket and selector.") swallowError(serverChannel.close()) -- 1.7.12.4