From 697f4a295b4eac2f6875b0a42e3a52556180ee62 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Mon, 24 Aug 2015 15:50:10 -0700 Subject: [PATCH] Addressed Joel's comments --- .../main/scala/kafka/network/SocketServer.scala | 60 +++++++++++++--------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 649812d..c059c32 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -253,34 +253,46 @@ private[kafka] class Acceptor(val host: String, serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = processorBeginIndex - while(isRunning) { - val ready = nioSelector.select(500) - if(ready > 0) { - val keys = nioSelector.selectedKeys() - val iter = keys.iterator() - while(iter.hasNext && isRunning) { - var key: SelectionKey = null - try { - key = iter.next - iter.remove() - if(key.isAcceptable) - accept(key, processors(currentProcessor)) - else - throw new IllegalStateException("Unrecognized key state for acceptor thread.") - - // round robin to the next processor thread - currentProcessor = (currentProcessor + 1) % processorEndIndex - if (currentProcessor < processorBeginIndex) currentProcessor = processorBeginIndex - } catch { - case e: Throwable => error("Error while accepting connection", e) + try { + while (isRunning) { + try { + val ready = nioSelector.select(500) + if (ready > 0) { + val keys = nioSelector.selectedKeys() + val iter = keys.iterator() + while (iter.hasNext && isRunning) { + var key: SelectionKey = null + try { + key = iter.next + iter.remove() + if (key.isAcceptable) + accept(key, processors(currentProcessor)) + else + throw new IllegalStateException("Unrecognized key state for acceptor thread.") + + // round robin to the next processor thread + currentProcessor = (currentProcessor + 1) % processorEndIndex + if (currentProcessor < processorBeginIndex) currentProcessor = processorBeginIndex + } catch { + case e: Throwable => error("Error while accepting connection", e) + } + } } } + catch { + // We catch all the throwables to prevent the acceptor thread from exiting in case if there are + // exceptions due to specific select operation on a channel or a bad request. We don't want the + // the broker to stop responding to requests from other clients. + case e: ControlThrowable => throw e + case e: Throwable => error("Error occurred", e) + } } + } finally { + debug("Closing server socket and selector.") + swallowError(serverChannel.close()) + swallowError(nioSelector.close()) + shutdownComplete() } - debug("Closing server socket and selector.") - swallowError(serverChannel.close()) - swallowError(nioSelector.close()) - shutdownComplete() } /* -- 1.7.12.4