Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 997098) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -58,6 +58,8 @@ import java.util.List; import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** An abstract IPC service. IPC calls take a single {@link Writable} as a @@ -129,6 +131,7 @@ protected String bindAddress; protected int port; // port we listen on private int handlerCount; // number of handler threads + private int readThreads; // number of read threads protected Class paramClass; // class of call parameters protected int maxIdleTime; // the maximum idle time after // which a client may be @@ -227,6 +230,8 @@ private ServerSocketChannel acceptChannel = null; //the accept channel private Selector selector = null; //the selector that we use for the server + private Reader[] readers = null; + private int currentReader = 0; private InetSocketAddress address; //the address we bind at private Random rand = new Random(); private long lastCleanupRunTime = 0; //the last time when a cleanup connec- @@ -235,6 +240,8 @@ //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); + private ExecutorService readPool; + public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode @@ -247,11 +254,86 @@ // create a selector; selector= Selector.open(); + readers = new Reader[readThreads]; + readPool = Executors.newFixedThreadPool(readThreads); + for (int i = 0; i < readThreads; ++i) { + Selector readSelector = Selector.open(); + Reader reader = new Reader(readSelector); + readers[i] = reader; + readPool.execute(reader); + } + // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); } + + + private class Reader implements Runnable { + private volatile boolean adding = false; + private Selector readSelector = null; + + Reader(Selector readSelector) { + this.readSelector = readSelector; + } + public void run() { + LOG.info("Starting SocketReader"); + synchronized(this) { + while (running) { + SelectionKey key = null; + try { + readSelector.select(); + while (adding) { + this.wait(1000); + } + + Iterator iter = readSelector.selectedKeys().iterator(); + while (iter.hasNext()) { + key = iter.next(); + iter.remove(); + if (key.isValid()) { + if (key.isReadable()) { + doRead(key); + } + } + key = null; + } + } catch (InterruptedException e) { + if (running) { // unexpected -- log it + LOG.info(getName() + "caught: " + + StringUtils.stringifyException(e)); + } + } catch (IOException ex) { + LOG.error("Error in Reader", ex); + } + } + } + } + + /** + * This gets reader into the state that waits for the new channel + * to be registered with readSelector. If it was waiting in select() + * the thread will be woken up, otherwise whenever select() is called + * it will return even if there is nothing to read and wait + * in while(adding) for finishAdd call + */ + public void startAdd() { + adding = true; + readSelector.wakeup(); + } + + public synchronized SelectionKey registerChannel(SocketChannel channel) + throws IOException { + return channel.register(readSelector, SelectionKey.OP_READ); + } + + public synchronized void finishAdd() { + adding = false; + this.notify(); + } + } + /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections * that will be cleanedup per run. The criteria for cleanup is the time @@ -319,8 +401,6 @@ if (key.isValid()) { if (key.isAcceptable()) doAccept(key); - else if (key.isReadable()) - doRead(key); } } catch (IOException ignored) { } @@ -343,11 +423,6 @@ cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ignored) {} } - } catch (InterruptedException e) { - if (running) { // unexpected -- log it - LOG.info(getName() + " caught: " + - StringUtils.stringifyException(e)); - } } catch (Exception e) { closeCurrentConnection(key); } @@ -389,25 +464,30 @@ void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c; ServerSocketChannel server = (ServerSocketChannel) key.channel(); - // accept up to 10 connections - for (int i=0; i<10; i++) { - SocketChannel channel = server.accept(); - if (channel==null) return; + SocketChannel channel; + while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); channel.socket().setKeepAlive(tcpKeepAlive); - SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ); - c = new Connection(channel, System.currentTimeMillis()); - readKey.attach(c); - synchronized (connectionList) { - connectionList.add(numConnections, c); - numConnections++; + + Reader reader = getReader(); + try { + reader.startAdd(); + SelectionKey readKey = reader.registerChannel(channel); + c = new Connection(channel, System.currentTimeMillis()); + readKey.attach(c); + synchronized (connectionList) { + connectionList.add(numConnections, c); + numConnections++; + } + if (LOG.isDebugEnabled()) + LOG.debug("Server connection from " + c.toString() + + "; # active connections: " + numConnections + + "; # queued calls: " + callQueue.size()); + } finally { + reader.finishAdd(); } - if (LOG.isDebugEnabled()) - LOG.debug("Server connection from " + c.toString() + - "; # active connections: " + numConnections + - "; # queued calls: " + callQueue.size()); } } @@ -452,7 +532,15 @@ LOG.info(getName() + ":Exception in closing listener socket. " + e); } } + readPool.shutdownNow(); } + + // The method that will return the next reader to work with + // Simplistic implementation of round robin for now + Reader getReader() { + currentReader = (currentReader + 1) % readers.length; + return readers[currentReader]; + } } // Sends responses of RPC back to clients. @@ -993,6 +1081,9 @@ this.handlerCount = handlerCount; this.socketSendBufferSize = 0; this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; + this.readThreads = conf.getInt( + "ipc.server.read.threadpool.size", + 1); this.callQueue = new LinkedBlockingQueue(maxQueueSize); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);