Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1304095) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -75,6 +75,8 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.cliffc.high_scale_lib.Counter; + /** An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on * a port and is defined by a parameter class and a value class. @@ -95,8 +97,14 @@ /** * How many calls/handler are allowed in the queue. */ - private static final int DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER = 10; + private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; + /** + * The maximum size that we can hold in the IPC queue + */ + private static final int DEFAULT_MAX_CALLQUEUE_SIZE = + 1024 * 1024 * 1024; + static final int BUFFER_INITIAL_SIZE = 1024; private static final String WARN_DELAYED_CALLS = @@ -191,6 +199,7 @@ protected Configuration conf; + private int maxQueueLength; private int maxQueueSize; protected int socketSendBufferSize; protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm @@ -199,6 +208,7 @@ volatile protected boolean running = true; // true while server runs protected BlockingQueue callQueue; // queued calls + protected final Counter callQueueSize = new Counter(); protected BlockingQueue priorityCallQueue; protected int highPriorityLevel; // what level a high priority call is at @@ -257,10 +267,11 @@ protected Responder responder; protected boolean delayReturnValue; // if the return value should be // set at call completion + protected long size; // size of current call protected boolean isError; public Call(int id, Writable param, Connection connection, - Responder responder) { + Responder responder, long size) { this.id = id; this.param = param; this.connection = connection; @@ -269,6 +280,7 @@ this.delayResponse = false; this.responder = responder; this.isError = false; + this.size = size; } @Override @@ -401,6 +413,10 @@ return this.delayReturnValue; } + public long getSize() { + return this.size; + } + /** * If we have a response, and delay is not set, then respond * immediately. Otherwise, do not respond to client. This is @@ -1197,7 +1213,7 @@ // we return 0 which will keep the socket up -- bad clients, unless // they switch to suit the running server -- will fail later doing // getProtocolVersion. - Call fakeCall = new Call(0, null, this, responder); + Call fakeCall = new Call(0, null, this, responder, 0); // Versions 3 and greater can interpret this exception // response in the same manner setupResponse(buffer, fakeCall, Status.FATAL, @@ -1229,10 +1245,24 @@ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id + long callSize = buf.length; - if (LOG.isDebugEnabled()) - LOG.debug(" got call #" + id + ", " + buf.length + " bytes"); + if (LOG.isDebugEnabled()) { + LOG.debug(" got call #" + id + ", " + callSize + " bytes"); + } + // Enforcing the call queue size, this triggers a retry in the client + if ((callSize + callQueueSize.get()) > maxQueueSize) { + final Call callTooBig = + new Call(id, null, this, responder, callSize); + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + setupResponse(responseBuffer, callTooBig, Status.FATAL, null, + IOException.class.getName(), + "Call queue is full, is ipc.server.max.callqueue.size too small?"); + responder.doRespond(callTooBig); + return; + } + Writable param; try { param = ReflectionUtils.newInstance(paramClass, conf);//read param @@ -1240,7 +1270,8 @@ } catch (Throwable t) { LOG.warn("Unable to read call parameters for client " + getHostAddress(), t); - final Call readParamsFailedCall = new Call(id, null, this, responder); + final Call readParamsFailedCall = + new Call(id, null, this, responder, callSize); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, @@ -1249,7 +1280,8 @@ responder.doRespond(readParamsFailedCall); return; } - Call call = new Call(id, param, this, responder); + Call call = new Call(id, param, this, responder, callSize); + callQueueSize.add(callSize); if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { priorityCallQueue.put(call); @@ -1353,7 +1385,7 @@ RequestContext.clear(); } CurCall.set(null); - + callQueueSize.add(call.getSize() * -1); // Set the response for undelayed calls and delayed calls with // undelayed responses. if (!call.isDelayed() || !call.isReturnValueDelayed()) { @@ -1436,15 +1468,29 @@ this.handlerCount = handlerCount; this.priorityHandlerCount = priorityHandlerCount; this.socketSendBufferSize = 0; + + // temporary backward compatibility + String oldMaxQueueSize = this.conf.get("ipc.server.max.queue.size"); + if (oldMaxQueueSize == null) { + this.maxQueueLength = + this.conf.getInt("ipc.server.max.callqueue.length", + handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + } else { + LOG.warn("ipc.server.max.queue.size was renamed " + + "ipc.server.max.callqueue.length, " + + "please update your configuration"); + this.maxQueueLength = Integer.getInteger(oldMaxQueueSize); + } + this.maxQueueSize = - this.conf.getInt("ipc.server.max.queue.size", - handlerCount * DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER); + this.conf.getInt("ipc.server.max.callqueue.size", + DEFAULT_MAX_CALLQUEUE_SIZE); this.readThreads = conf.getInt( "ipc.server.read.threadpool.size", 10); - this.callQueue = new LinkedBlockingQueue(maxQueueSize); + this.callQueue = new LinkedBlockingQueue(maxQueueLength); if (priorityHandlerCount > 0) { - this.priorityCallQueue = new LinkedBlockingQueue(maxQueueSize); // TODO hack on size + this.priorityCallQueue = new LinkedBlockingQueue(maxQueueLength); // TODO hack on size } else { this.priorityCallQueue = null; }