diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java index e630125..c5fe8c1 100644 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java +++ b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java @@ -612,15 +612,29 @@ public abstract class SecureServer extends HBaseServer { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id + long callSize = buf.length; if (LOG.isTraceEnabled()) { LOG.trace(" got #" + id); } + // Enforcing the call queue size, this triggers a retry in the client + if ((callSize + callQueueSize.get()) > maxQueueSize) { + final SecureCall callTooBig = + new SecureCall(id, null, this, responder, callSize); + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + setupResponse(responseBuffer, callTooBig, Status.FATAL, null, + IOException.class.getName(), + "Call queue is full, is ipc.server.max.callqueue.size too small?"); + responder.doRespond(callTooBig); + return; + } + Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param param.readFields(dis); - SecureCall call = new SecureCall(id, param, this, responder, buf.length); + SecureCall call = new SecureCall(id, param, this, responder, callSize); + callQueueSize.add(callSize); if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { priorityCallQueue.put(call); @@ -757,4 +771,4 @@ public abstract class SecureServer extends HBaseServer { protocol, getConf(), addr); } } -} \ No newline at end of file +} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 261ce4a..417e2a2 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -204,7 +204,7 @@ public abstract class HBaseServer implements RpcServer { protected Configuration conf; private int maxQueueLength; - private int maxQueueSize; + protected int maxQueueSize; protected int socketSendBufferSize; protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives @@ -1606,7 +1606,7 @@ public abstract class HBaseServer implements RpcServer { * @param error error message, if the call failed * @throws IOException */ - private void setupResponse(ByteArrayOutputStream response, + protected void setupResponse(ByteArrayOutputStream response, Call call, Status status, Writable rv, String errorClass, String error) throws IOException {