Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1303978) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -54,6 +54,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -95,8 +96,14 @@ /** * How many calls/handler are allowed in the queue. */ - private static final int DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER = 10; + private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; + /** + * The maximum size that we can hold in the IPC queue + */ + private static final int DEFAULT_MAX_CALLQUEUE_SIZE = + 1024 * 1024 * 1024; + static final int BUFFER_INITIAL_SIZE = 1024; private static final String WARN_DELAYED_CALLS = @@ -191,6 +198,7 @@ protected Configuration conf; + private int maxQueueLength; private int maxQueueSize; protected int socketSendBufferSize; protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm @@ -199,6 +207,7 @@ volatile protected boolean running = true; // true while server runs protected BlockingQueue callQueue; // queued calls + protected final AtomicLong callQueueSize = new AtomicLong(); protected BlockingQueue priorityCallQueue; protected int highPriorityLevel; // what level a high priority call is at @@ -257,10 +266,11 @@ protected Responder responder; protected boolean delayReturnValue; // if the return value should be // set at call completion + protected long size; // size of current call protected boolean isError; public Call(int id, Writable param, Connection connection, - Responder responder) { + Responder responder, long size) { this.id = id; this.param = param; this.connection = connection; @@ -269,6 +279,7 @@ this.delayResponse = false; this.responder = responder; this.isError = false; + this.size = size; } @Override @@ -401,6 +412,10 @@ return this.delayReturnValue; } + public long getSize() { + return this.size; + } + /** * If we have a response, and delay is not set, then respond * immediately. Otherwise, do not respond to client. This is @@ -1196,7 +1211,7 @@ // we return 0 which will keep the socket up -- bad clients, unless // they switch to suit the running server -- will fail later doing // getProtocolVersion. - Call fakeCall = new Call(0, null, this, responder); + Call fakeCall = new Call(0, null, this, responder, 0); // Versions 3 and greater can interpret this exception // response in the same manner setupResponse(buffer, fakeCall, Status.FATAL, @@ -1228,10 +1243,23 @@ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id + long callSize = buf.length; if (LOG.isDebugEnabled()) - LOG.debug(" got call #" + id + ", " + buf.length + " bytes"); + LOG.debug(" got call #" + id + ", " + callSize + " bytes"); + // Enforcing the call queue size, this triggers a retry in the client + if ((callSize + callQueueSize.get()) > maxQueueSize) { + final Call callTooBig = + new Call(id, null, this, responder, callSize); + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + setupResponse(responseBuffer, callTooBig, Status.FATAL, null, + IOException.class.getName(), + "Call queue is full, retry later"); + responder.doRespond(callTooBig); + return; + } + Writable param; try { param = ReflectionUtils.newInstance(paramClass, conf);//read param @@ -1239,7 +1267,8 @@ } catch (Throwable t) { LOG.warn("Unable to read call parameters for client " + getHostAddress(), t); - final Call readParamsFailedCall = new Call(id, null, this, responder); + final Call readParamsFailedCall = + new Call(id, null, this, responder, callSize); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, @@ -1248,7 +1277,8 @@ responder.doRespond(readParamsFailedCall); return; } - Call call = new Call(id, param, this, responder); + Call call = new Call(id, param, this, responder, callSize); + callQueueSize.addAndGet(callSize); if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { priorityCallQueue.put(call); @@ -1335,7 +1365,7 @@ RequestContext.clear(); } CurCall.set(null); - + callQueueSize.addAndGet(call.getSize() * -1); // Set the response for undelayed calls and delayed calls with // undelayed responses. if (!call.isDelayed() || !call.isReturnValueDelayed()) { @@ -1413,15 +1443,19 @@ this.handlerCount = handlerCount; this.priorityHandlerCount = priorityHandlerCount; this.socketSendBufferSize = 0; + // TODO add a deprecated comment for ipc.server.max.queue.size + this.maxQueueLength = + this.conf.getInt("ipc.server.max.callqueue.length", + handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.maxQueueSize = - this.conf.getInt("ipc.server.max.queue.size", - handlerCount * DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER); + this.conf.getInt("ipc.server.max.callqueue.size", + DEFAULT_MAX_CALLQUEUE_SIZE); this.readThreads = conf.getInt( "ipc.server.read.threadpool.size", 10); - this.callQueue = new LinkedBlockingQueue(maxQueueSize); + this.callQueue = new LinkedBlockingQueue(maxQueueLength); if (priorityHandlerCount > 0) { - this.priorityCallQueue = new LinkedBlockingQueue(maxQueueSize); // TODO hack on size + this.priorityCallQueue = new LinkedBlockingQueue(maxQueueLength); // TODO hack on size } else { this.priorityCallQueue = null; }