Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1395032) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1395033) @@ -93,10 +93,11 @@ public static final byte CURRENT_VERSION = VERSION_RPCOPTIONS; - /** - * How many calls/handler are allowed in the queue. - */ - private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; + /** + * How much memory do we want for the blocking queue + */ + private static final int MAX_CALL_QUEUE_MEMORY_SIZE = 1024*1024*1024; + private SizeBasedThrottler callQueueThrottler; public static final Log LOG = LogFactory.getLog(HBaseServer.class.getName()); @@ -170,7 +171,6 @@ protected Configuration conf; @SuppressWarnings({"FieldCanBeLocal"}) - private int maxQueueSize; protected int socketSendBufferSize; protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives @@ -238,7 +238,7 @@ protected Compression.Algorithm compressionAlgo = Compression.Algorithm.NONE; protected int version = CURRENT_VERSION; // version used for the call - + protected boolean shouldProfile = false; protected ProfilingData profilingData = null; protected String tag = null; @@ -252,11 +252,11 @@ this.partialResponseSize = 0; this.timestamp = timestamp; } - + public void setTag(String tag) { this.tag = tag; } - + public String getTag() { return tag; } @@ -311,7 +311,7 @@ private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); - + private final int readerCount = conf.getInt("ipc.server.reader.count", Runtime.getRuntime().availableProcessors() + 1); // number of reader threads @@ -338,7 +338,7 @@ acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); - + readers = new Reader[readerCount]; for (int i = 0; i < readerCount; i++) { readers[i] = new Reader(i); @@ -582,7 +582,7 @@ interrupt(); } - private void processKey(SelectionKey key) throws InterruptedException{ + private void processKey(SelectionKey key) throws InterruptedException { if (key.isValid()) { if (key.isReadable()) { doRead(key); @@ -592,7 +592,7 @@ } } - private void doRead(SelectionKey key) throws InterruptedException{ + private void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { @@ -1158,7 +1158,13 @@ } private void queueRawCall() throws InterruptedException { - callQueue.put(new RawCall(this, data)); // queue the call; maybe blocked here + callQueueThrottler.increase(data.limit()); + try { + // queue the call + callQueue.put(new RawCall(this, data)); + } catch (InterruptedException e) { + callQueueThrottler.decrease(data.limit()); + } } /// Reads the header following version @@ -1206,7 +1212,9 @@ while (running) { try { status.pause("Waiting for a call"); - Call call = callQueue.take().parse(); // pop the queue; maybe blocked here + RawCall rawCall = callQueue.take(); // pop the queue; maybe blocked here + callQueueThrottler.decrease(rawCall.data.limit()); + Call call = rawCall.parse(); status.setStatus("Setting up call"); status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); @@ -1217,14 +1225,14 @@ String errorClass = null; String error = null; Writable value = null; - + if (call.shouldProfile) { call.profilingData = new ProfilingData (); } else { call.profilingData = null; } HRegionServer.callContext.set(call); - + CurCall.set(call); UserGroupInformation previous = UserGroupInformation.getCurrentUGI(); UserGroupInformation.setCurrentUser(call.connection.ticket); @@ -1240,13 +1248,13 @@ long total = System.currentTimeMillis () - start; UserGroupInformation.setCurrentUser(previous); CurCall.set(null); - + if (call.shouldProfile) { call.profilingData.addLong( ProfilingData.TOTAL_SERVER_TIME_MS, total); } HRegionServer.callContext.remove(); - + int size = BUFFER_INITIAL_SIZE; if (value instanceof WritableWithSize) { // get the size hint. @@ -1272,10 +1280,10 @@ // 1. write call id uncompressed out.writeInt(call.id); - + // 2. write error flag uncompressed out.writeBoolean(error != null); - + if (call.getVersion() >= VERSION_RPCOPTIONS) { // 3. write the compression type for the rest of the response out.writeUTF(call.getRPCCompression().getName()); @@ -1359,8 +1367,8 @@ this.paramClass = paramClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; - this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; - this.callQueue = new LinkedBlockingQueue(maxQueueSize); + this.callQueue = new LinkedBlockingQueue(); + callQueueThrottler = new SizeBasedThrottler(MAX_CALL_QUEUE_MEMORY_SIZE); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); @@ -1572,7 +1580,7 @@ int nBytes = initialRemaining - buf.remaining(); return (nBytes > 0) ? nBytes : ret; } - + /** * Set the OP_READ interest for the selection key * @param selectionKey @@ -1582,7 +1590,7 @@ selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ); } } - + /** * Unset the OP_READ interest for the selection key * @param selectionKey