diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 96f506f..110b5e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -183,8 +183,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; public class RpcServer implements RpcServerInterface, ConfigurationObserver { // LOG is being used in CallRunner and the log level is being changed in tests public static final Log LOG = LogFactory.getLog(RpcServer.class); - private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION - = new CallQueueTooBigException(); private final boolean authorize; private boolean isSecurityEnabled; @@ -2225,8 +2223,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null, null, 0, this.callCleanup); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); - setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, + Exception e = new CallQueueTooBigException("totalRequestSize=" + + totalRequestSize + " + callQueueSizeInBytes.sum()=" + callQueueSizeInBytes.sum() + + "=" + (totalRequestSize + callQueueSizeInBytes.sum()) + " > maxQueueSizeInBytes=" + + maxQueueSizeInBytes); + metrics.exception(e); + setupResponse(responseBuffer, callTooBig, e, "Call queue is full on " + server.getServerName() + ", is hbase.ipc.server.max.callqueue.size too small?"); responder.doRespond(callTooBig); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java index fe988aa..c24e7df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java @@ -79,6 +79,9 @@ public class TestAsyncGetMultiThread { TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); + TEST_UTIL.getConfiguration().setLong("hbase.ipc.server.max.callqueue.size", + TEST_UTIL.getConfiguration().getLong("hbase.ipc.server.max.callqueue.size", + /*RpcServer#DEFAULT_MAX_CALLQUEUE_SIZE =*/ 1024 * 1024 * 1024) * 10); TEST_UTIL.startMiniCluster(5); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -104,10 +107,12 @@ public class TestAsyncGetMultiThread { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { while (!stop.get()) { int i = ThreadLocalRandom.current().nextInt(COUNT); - assertEquals(i, - Bytes.toInt( - CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() - .getValue(FAMILY, QUALIFIER))); + byte [] bytes = CONN.getRawTable(TABLE_NAME). + get(new Get(Bytes.toBytes(String.format("%03d", i)))).get().getValue(FAMILY, QUALIFIER); + if (bytes == null) { + if (stop.get()) return; + } + assertEquals(i, Bytes.toInt(bytes)); } }