diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 063c07f..c72662e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -259,15 +259,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected HBaseRPCErrorHandler errorHandler = null; + static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size"; private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; /** Default value for above params */ + private static final long DEFAULT_MAX_REQUEST_SIZE = 128 * 1024 * 1024L; private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; private static final ObjectMapper MAPPER = new ObjectMapper(); + private final int maxRequestSize; private final int warnResponseTime; private final int warnResponseSize; private final Server server; @@ -1204,6 +1207,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected String hostAddress; protected int remotePort; ConnectionHeader connectionHeader; + /** * Codec the client asked use. */ @@ -1588,11 +1592,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } if (dataLength < 0) { // A data length of zero is legal. - throw new IllegalArgumentException("Unexpected data length " + throw new DoNotRetryIOException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } - // TODO: check dataLength against some limit so that the client cannot OOM the server + if (dataLength > maxRequestSize) { + throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from " + + getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \"" + + MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)"); + } + data = ByteBuffer.allocate(dataLength); // Increment the rpc count. This counter will be decreased when we write @@ -2036,6 +2045,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); + // min of 128MB or (Heap memory * 0.2 / # handlers / 2) + this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, (int)Math.min( + Runtime.getRuntime().maxMemory() * 0.2 / conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) / 2, DEFAULT_MAX_REQUEST_SIZE)); + // Start the listener here and let it bind to the port listener = new Listener(name); this.port = listener.getAddress().getPort(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index ffe4d40..e8da9ee 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; +import org.apache.http.ConnectionClosedException; import org.junit.Assert; import org.junit.Test; @@ -137,13 +138,17 @@ public abstract class AbstractTestIPC { static class TestRpcServer extends RpcServer { TestRpcServer() throws IOException { - this(new FifoRpcScheduler(CONF, 1)); + this(new FifoRpcScheduler(CONF, 1), CONF); + } + + TestRpcServer(Configuration conf) throws IOException { + this(new FifoRpcScheduler(conf, 1), conf); } - TestRpcServer(RpcScheduler scheduler) throws IOException { + TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { super(null, "testRpcServer", Lists .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress( - "localhost", 0), CONF, scheduler); + "localhost", 0), conf, scheduler); } @Override @@ -267,7 +272,7 @@ public abstract class AbstractTestIPC { @Test public void testRpcScheduler() throws IOException, InterruptedException { RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); - RpcServer rpcServer = new TestRpcServer(scheduler); + RpcServer rpcServer = new TestRpcServer(scheduler, CONF); verify(scheduler).init((RpcScheduler.Context) anyObject()); AbstractRpcClient client = createRpcClient(CONF); try { @@ -292,6 +297,37 @@ public abstract class AbstractTestIPC { } } + /** Tests that the rpc scheduler is called when requests arrive. */ + @Test + public void testRpcMaxRequestSize() throws IOException, InterruptedException { + Configuration conf = new Configuration(CONF); + conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100); + RpcServer rpcServer = new TestRpcServer(conf); + AbstractRpcClient client = createRpcClient(conf); + try { + rpcServer.start(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + // set total RPC size bigger than 100 bytes + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello.hello.hello.hello." + + "hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello").build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + try { + client.call(new PayloadCarryingRpcController( + CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, + md.getOutputType().toProto(), User.getCurrent(), address, + new MetricsConnection.CallStats()); + fail("RPC should have failed because it exceeds max request size"); + } catch(ConnectionClosingException | ConnectionClosedException ex) { + // pass + } + } finally { + rpcServer.stop(); + } + } + /** * Instance of RpcServer that echoes client hostAddress back to client */