diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 8cead2a..96f73fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -51,12 +51,15 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -576,8 +579,7 @@ public class RpcServer implements RpcServerInterface { private ServerSocketChannel acceptChannel = null; //the accept channel private Selector selector = null; //the selector that we use for the server - private Reader[] readers = null; - private int currentReader = 0; + private Reader reader; private Random rand = new Random(); private long lastCleanupRunTime = 0; //the last time when a cleanup connec- //-tion (for idle connections) ran @@ -601,14 +603,12 @@ public class RpcServer implements RpcServerInterface { // create a selector; selector= Selector.open(); - readers = new Reader[readThreads]; + reader = new Reader(); readPool = Executors.newFixedThreadPool(readThreads, new ThreadFactoryBuilder().setNameFormat( "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + ",port=" + port).setDaemon(true).build()); for (int i = 0; i < readThreads; ++i) { - Reader reader = new Reader(); - readers[i] = reader; readPool.execute(reader); } LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port); @@ -620,74 +620,130 @@ public class RpcServer implements RpcServerInterface { } - private class Reader implements Runnable { - private volatile boolean adding = false; - private final Selector readSelector; + class Reader implements Runnable { + final Selector readSelector; + final Semaphore leaderFollowersSemaphore = new Semaphore(1); + final Queue registrationQueue = new ConcurrentLinkedQueue(); + final Queue selectedKeyQueue = new ConcurrentLinkedQueue(); + final Queue doneKeyQueue = new ConcurrentLinkedQueue(); Reader() throws IOException { this.readSelector = Selector.open(); } + @Override public void run() { try { doRunLoop(); + } catch (InterruptedException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": InterruptedException in Reader", ex); + } + Thread.currentThread().interrupt(); } finally { try { readSelector.close(); } catch (IOException ioe) { - LOG.error(getName() + ": error closing read selector in " + getName(), ioe); + if (LOG.isErrorEnabled()) { + LOG.error(getName() + ": error closing read selector in " + getName(), ioe); + } } } } - private synchronized void doRunLoop() { + void doRunLoop() throws InterruptedException { while (running) { + leaderFollowersSemaphore.acquire(); + + SelectionKey key = selectedKeyQueue.poll(); + if (key != null) { + processing(key); + continue; + } + + int processingCount = leading(); + + SelectionKey firstKey = selectedKeyQueue.poll(); + leaderFollowersSemaphore.release(processingCount); + processing(firstKey); + } + } + + void processing(SelectionKey key) throws InterruptedException { + doRead(key); + if(key.isValid()) { + doneKeyQueue.add(key); + readSelector.wakeup(); + } + } + + int leading() throws InterruptedException { + while (true) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + Connection conn; + while ((conn = registrationQueue.poll()) != null) { + try { + conn.channel.register(readSelector, SelectionKey.OP_READ, conn); + } catch (ClosedChannelException ex) { + if (LOG.isInfoEnabled()) { + LOG.info(getName() + ": Connection cannot be registered in Reader", ex); + } + } + } + + SelectionKey doneKey; + while ((doneKey = doneKeyQueue.poll()) != null) { + try { + doneKey.interestOps(SelectionKey.OP_READ); + } catch (CancelledKeyException e) { + // Other threads can close the channel at any time + // via closeConnection(Connection). + LOG.trace("ignored", e); + } + } + try { - readSelector.select(); - while (adding) { - this.wait(1000); + if (readSelector.select() == 0) { + continue; } + } catch (IOException ex) { + if (LOG.isErrorEnabled()) { + LOG.error(getName() + ": I/O error by selector in Reader", ex); + } + continue; + } - Iterator iter = readSelector.selectedKeys().iterator(); - while (iter.hasNext()) { - SelectionKey key = iter.next(); - iter.remove(); - if (key.isValid()) { - if (key.isReadable()) { - doRead(key); - } + int count = 0; + Iterator iter = readSelector.selectedKeys().iterator(); + while (iter.hasNext()) { + SelectionKey key = iter.next(); + iter.remove(); + if (key.isValid() && key.isReadable()) { + try { + key.interestOps(0); + selectedKeyQueue.add(key); + count++; + } catch (CancelledKeyException e) { + // Other threads can close the channel at any time + // via closeConnection(Connection). + LOG.trace("ignored", e); } } - } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping"); - return; - } catch (IOException ex) { - LOG.info(getName() + ": IOException in Reader", ex); + } + + if (count > 0) { + return count; } } } - /** - * This gets reader into the state that waits for the new channel - * to be registered with readSelector. If it was waiting in select() - * the thread will be woken up, otherwise whenever select() is called - * it will return even if there is nothing to read and wait - * in while(adding) for finishAdd call - */ - public void startAdd() { - adding = true; + void registerConnection(Connection conn) { + registrationQueue.add(conn); readSelector.wakeup(); } - - public synchronized SelectionKey registerChannel(SocketChannel channel) - throws IOException { - return channel.register(readSelector, SelectionKey.OP_READ); - } - - public synchronized void finishAdd() { - adding = false; - this.notify(); - } } /** cleanup connections from connectionList. Choose a random range @@ -842,22 +898,16 @@ public class RpcServer implements RpcServerInterface { throw ioe; } - Reader reader = getReader(); - try { - reader.startAdd(); - SelectionKey readKey = reader.registerChannel(channel); - c = getConnection(channel, System.currentTimeMillis()); - readKey.attach(c); - synchronized (connectionList) { - connectionList.add(numConnections, c); - numConnections++; - } - if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": connection from " + c.toString() + - "; # active connections: " + numConnections); - } finally { - reader.finishAdd(); + c = getConnection(channel, System.currentTimeMillis()); + reader.registerConnection(c); + + synchronized (connectionList) { + connectionList.add(numConnections, c); + numConnections++; } + if (LOG.isDebugEnabled()) + LOG.debug(getName() + ": connection from " + c.toString() + + "; # active connections: " + numConnections); } } @@ -907,13 +957,6 @@ public class RpcServer implements RpcServerInterface { } readPool.shutdownNow(); } - - // The method that will return the next reader to work with - // Simplistic implementation of round robin for now - Reader getReader() { - currentReader = (currentReader + 1) % readers.length; - return readers[currentReader]; - } } // Sends responses of RPC back to clients.